Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init global state within pipeline #2577

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/common/task_system/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ Task::Task(uint64_t maxNumThreads) : maxNumThreads{maxNumThreads} {}

bool Task::registerThread() {
lock_t lck{mtx};
if (!hasExceptionNoLock() && canRegisterInternalNoLock()) {
if (!hasExceptionNoLock() && canRegisterNoLock()) {
numThreadsRegistered++;
return true;
}
return false;
}

void Task::deRegisterThreadAndFinalizeTaskIfNecessary() {
void Task::deRegisterThreadAndFinalizeTask() {
lock_t lck{mtx};
++numThreadsFinished;
if (!hasExceptionNoLock() && isCompletedNoLock()) {
Expand Down
4 changes: 2 additions & 2 deletions src/common/task_system/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ void TaskScheduler::runWorkerThread() {
}
try {
scheduledTask->task->run();
scheduledTask->task->deRegisterThreadAndFinalizeTaskIfNecessary();
scheduledTask->task->deRegisterThreadAndFinalizeTask();
} catch (std::exception& e) {
scheduledTask->task->setException(std::current_exception());
scheduledTask->task->deRegisterThreadAndFinalizeTaskIfNecessary();
scheduledTask->task->deRegisterThreadAndFinalizeTask();
continue;
}
}
Expand Down
14 changes: 2 additions & 12 deletions src/include/common/task_system/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,6 @@ class Task {
children.push_back(std::move(child));
}

inline bool isCompletedOrHasException() {
lock_t lck{mtx};
return hasExceptionNoLock() || isCompletedNoLock();
}

inline bool isCompleted() {
lock_t lck{mtx};
return isCompletedNoLock();
}

inline bool isCompletedSuccessfully() {
lock_t lck{mtx};
return isCompletedNoLock() && !hasExceptionNoLock();
Expand All @@ -65,7 +55,7 @@ class Task {

bool registerThread();

void deRegisterThreadAndFinalizeTaskIfNecessary();
void deRegisterThreadAndFinalizeTask();

inline void setException(std::exception_ptr exceptionPtr) {
lock_t lck{mtx};
Expand All @@ -83,7 +73,7 @@ class Task {
}

private:
bool canRegisterInternalNoLock() const {
bool canRegisterNoLock() const {
return 0 == numThreadsFinished && maxNumThreads > numThreadsRegistered;
}

Expand Down
1 change: 0 additions & 1 deletion src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class CopyNodeSharedState {
common::vector_idx_t pkColumnIdx;
std::unique_ptr<common::LogicalType> pkType;
std::unique_ptr<storage::PrimaryKeyIndexBuilder> pkIndex;
bool isIndexReserved = false;

InQueryCallSharedState* readerSharedState;
HashAggregateSharedState* distinctSharedState;
Expand Down
4 changes: 3 additions & 1 deletion src/include/processor/processor_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ class ProcessorTask : public common::Task {

public:
ProcessorTask(Sink* sink, ExecutionContext* executionContext)
: Task{executionContext->numThreads}, sink{sink}, executionContext{executionContext} {}
: Task{executionContext->numThreads}, sharedStateInitialized{false}, sink{sink},
executionContext{executionContext} {}

void run() override;
void finalizeIfNecessary() override;
Expand All @@ -21,6 +22,7 @@ class ProcessorTask : public common::Task {
Sink* op, storage::MemoryManager* memoryManager);

private:
bool sharedStateInitialized;
Sink* sink;
ExecutionContext* executionContext;
};
Expand Down
16 changes: 4 additions & 12 deletions src/processor/operator/persistent/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ void CopyNodeSharedState::init() {
auto indexFName = StorageUtils::getNodeIndexFName(
wal->getDirectory(), table->getTableID(), FileVersionType::ORIGINAL);
pkIndex = std::make_unique<PrimaryKeyIndexBuilder>(indexFName, *pkType);
uint64_t numRows = 0;
if (readerSharedState != nullptr) {
KU_ASSERT(distinctSharedState == nullptr);
uint64_t numRows = 0;
auto sharedState =
reinterpret_cast<function::ScanSharedState*>(readerSharedState->sharedState.get());
numRows = sharedState->numRows;
pkIndex->bulkReserve(numRows);
isIndexReserved = true;
} else {
numRows = distinctSharedState->getFactorizedTable()->getNumTuples();
}
pkIndex->bulkReserve(numRows);
}
wal->logCopyTableRecord(table->getTableID(), TableType::NODE);
wal->flushAllPages();
Expand Down Expand Up @@ -67,15 +68,6 @@ void CopyNode::initGlobalStateInternal(ExecutionContext* /*context*/) {
}

void CopyNode::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /*context*/) {
if (sharedState->distinctSharedState != nullptr) {
std::unique_lock xLck{sharedState->mtx};
// TODO(Xiyang): we should init shared state after the previous pipeline finishes execution.
if (!sharedState->isIndexReserved) {
auto numRows = sharedState->distinctSharedState->getFactorizedTable()->getNumTuples();
sharedState->pkIndex->bulkReserve(numRows);
sharedState->isIndexReserved = true;
}
}
std::shared_ptr<DataChunkState> state;
for (auto& pos : info->columnPositions) {
if (pos.isValid()) {
Expand Down
7 changes: 2 additions & 5 deletions src/processor/operator/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,8 @@ std::unique_ptr<PhysicalOperator> PhysicalOperator::moveUnaryChild() {
}

void PhysicalOperator::initGlobalState(ExecutionContext* context) {
// Init from right to left so that we init in the same order as we decompose.
// TODO(Xiyang): this is a very implicit assumption. We should init global state during
// decomposition ideally.
for (auto i = children.size(); i > 0; --i) {
children[i - 1]->initGlobalState(context);
if (!isSource()) {
children[0]->initGlobalState(context);
}
initGlobalStateInternal(context);
}
Expand Down
3 changes: 0 additions & 3 deletions src/processor/processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ QueryProcessor::QueryProcessor(uint64_t numThreads) {
std::shared_ptr<FactorizedTable> QueryProcessor::execute(
PhysicalPlan* physicalPlan, ExecutionContext* context) {
auto lastOperator = physicalPlan->lastOperator.get();
// Init global state before decompose into pipelines. Otherwise, each pipeline will try to
// init global state. Result in global state being initialized multiple times.
lastOperator->initGlobalState(context);
auto resultCollector = reinterpret_cast<ResultCollector*>(lastOperator);
// The root pipeline(task) consists of operators and its prevOperator only, because we
// expect to have linear plans. For binary operators, e.g., HashJoin, we keep probe and its
Expand Down
4 changes: 4 additions & 0 deletions src/processor/processor_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ void ProcessorTask::run() {
// We need the lock when cloning because multiple threads can be accessing to clone,
// which is not thread safe
lock_t lck{mtx};
if (!sharedStateInitialized) {
sink->initGlobalState(executionContext);
sharedStateInitialized = true;
}
auto clonedPipelineRoot = sink->clone();
lck.unlock();
auto currentSink = (Sink*)clonedPipelineRoot.get();
Expand Down