Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch exception during finalize phase #1835

Merged
merged 1 commit into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/common/task_system/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
lock_t lck{mtx};
++numThreadsFinished;
if (!hasExceptionNoLock() && isCompletedNoLock()) {
finalizeIfNecessary();
try {
finalizeIfNecessary();
} catch (std::exception& e) { setExceptionNoLock(std::current_exception()); }

Check warning on line 23 in src/common/task_system/task.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/task_system/task.cpp#L23

Added line #L23 was not covered by tests
}
}

Expand Down
44 changes: 1 addition & 43 deletions src/common/task_system/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ using namespace kuzu::common;
namespace kuzu {
namespace common {

TaskScheduler::TaskScheduler(uint64_t numThreads)
: logger{LoggerUtils::getLogger(LoggerConstants::LoggerEnum::PROCESSOR)}, nextScheduledTaskID{
0} {
TaskScheduler::TaskScheduler(uint64_t numThreads) : nextScheduledTaskID{0} {
for (auto n = 0u; n < numThreads; ++n) {
threads.emplace_back([&] { runWorkerThread(); });
}
Expand All @@ -30,38 +28,6 @@ std::shared_ptr<ScheduledTask> TaskScheduler::scheduleTask(const std::shared_ptr
return scheduledTask;
}

void TaskScheduler::errorIfThereIsAnException() {
lock_t lck{mtx};
errorIfThereIsAnExceptionNoLock();
lck.unlock();
}

void TaskScheduler::errorIfThereIsAnExceptionNoLock() {
for (auto it = taskQueue.begin(); it != taskQueue.end(); ++it) {
auto task = (*it)->task;
if (task->hasException()) {
taskQueue.erase(it);
std::rethrow_exception(task->getExceptionPtr());
}
// TODO(Semih): We can optimize to stop after finding a registrable task. This is
// because tasks after the first registrable task in the queue cannot have any thread
// yet registered to them, so they cannot have errored.
}
}

void TaskScheduler::waitAllTasksToCompleteOrError() {
while (true) {
lock_t lck{mtx};
if (taskQueue.empty()) {
return;
}
errorIfThereIsAnExceptionNoLock();
lck.unlock();
std::this_thread::sleep_for(
std::chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
}
}

void TaskScheduler::scheduleTaskAndWaitOrError(
const std::shared_ptr<Task>& task, processor::ExecutionContext* context) {
for (auto& dependency : task->children) {
Expand All @@ -84,14 +50,6 @@ void TaskScheduler::scheduleTaskAndWaitOrError(
}
}

void TaskScheduler::waitUntilEnoughTasksFinish(int64_t minimumNumTasksToScheduleMore) {
while (getNumTasks() > minimumNumTasksToScheduleMore) {
errorIfThereIsAnException();
std::this_thread::sleep_for(
std::chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
}
}

std::shared_ptr<ScheduledTask> TaskScheduler::getTaskAndRegister() {
lock_t lck{mtx};
if (taskQueue.empty()) {
Expand Down
10 changes: 7 additions & 3 deletions src/include/common/task_system/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ class Task {

inline void setException(std::exception_ptr exceptionPtr) {
lock_t lck{mtx};
if (this->exceptionsPtr == nullptr) {
this->exceptionsPtr = exceptionPtr;
}
setExceptionNoLock(exceptionPtr);
}

inline bool hasException() {
Expand All @@ -88,6 +86,12 @@ class Task {

inline bool hasExceptionNoLock() const { return exceptionsPtr != nullptr; }

inline void setExceptionNoLock(std::exception_ptr exceptionPtr) {
if (exceptionsPtr == nullptr) {
exceptionsPtr = exceptionPtr;
}
}

public:
Task* parent = nullptr;
std::vector<std::shared_ptr<Task>>
Expand Down
41 changes: 3 additions & 38 deletions src/include/common/task_system/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,10 @@ struct ScheduledTask {
* is completed is removed automatically from the queue. If there is a task that raises an
* exception, the worker threads catch it and store it with the tasks. The user thread that is
* waiting on the completion of the task (or tasks) will throw the exception (the user thread could
* be waiting on a tasks through a function that waits, e.g., scheduleTaskAndWaitOrError or
* waitAllTasksToCompleteOrError.
* be waiting on a tasks through a function that waits, e.g., scheduleTaskAndWaitOrError.
*
* Currently there are two ways the TaskScheduler can be used:
* (1) Schedule a set of tasks T1, ..., Tk, and wait for all of them to be completed or error
* if any one them has raised an exception (but only the exception of one (the earliest scheduled)
* task will be raised. We do not currently re-throw multiple exceptions from multiple tasks. Ex:
* schedule(T1);
* ...;
* schedule(Tk);
* waitAllTasksToCompleteOrError();
* (2) Schedule one task T and wait for T to finish or error if there was an exception raised by
* Currently there is one way the TaskScheduler can be used:
* Schedule one task T and wait for T to finish or error if there was an exception raised by
* one of the threads working on T that errored. This is simply done by the call:
* scheduleTaskAndWaitOrError(T);
*
Expand All @@ -61,45 +53,18 @@ class TaskScheduler {
void scheduleTaskAndWaitOrError(
const std::shared_ptr<Task>& task, processor::ExecutionContext* context);

// If a user, e.g., currently the copier, adds a set of tasks T1, ..., Tk, to the task scheduler
// without waiting for them to finish, the user needs to call waitAllTasksToCompleteOrError() if
// it wants to catch the errors that may have happened in T1, ..., Tk. If this function is not
// called and a T_i errors, there will be two side effects: (1) the user will never be aware
// that there was an error in some of the scheduled tasks; (2) the erroring task will not be
// removed from the task queue, so it will remain there permanently. We only remove erroring
// tasks inside waitAllTasksToCompleteOrError and scheduleTaskAndWaitOrError. Also, see the note
// below in waitAllTasksToCompleteOrError for details of the behavior when multiple tasks fail.
std::shared_ptr<ScheduledTask> scheduleTask(const std::shared_ptr<Task>& task);

// Also note that if a user has scheduled multiple concrete tasks and calls
// waitAllTasksToCompleteOrError and multiple tasks error, then waitAllTasksToCompleteOrError
// will rethrow only the exception of the first task that it observes to have an error and
// it will remove only that one. Other tasks that may have failed many not be removed
// from the task queue and remain in the queue. So for now, use this function if you
// want the system to crash if any of the tasks fails.
void waitAllTasksToCompleteOrError();

void waitUntilEnoughTasksFinish(int64_t minimumNumTasksToScheduleMore);

// Checks if there is an erroring task in the queue and if so, errors.
void errorIfThereIsAnException();

bool isTaskQueueEmpty() { return taskQueue.empty(); }
uint64_t getNumTasks() { return taskQueue.size(); }

private:
void removeErroringTask(uint64_t scheduledTaskID);

void errorIfThereIsAnExceptionNoLock();

// Functions to launch worker threads and for the worker threads to use to grab task from queue.
void runWorkerThread();
std::shared_ptr<ScheduledTask> getTaskAndRegister();

void interruptTaskIfTimeOutNoLock(processor::ExecutionContext* context);

private:
std::shared_ptr<spdlog::logger> logger;
std::mutex mtx;
std::deque<std::shared_ptr<ScheduledTask>> taskQueue;
std::atomic<bool> stopThreads{false};
Expand Down
Loading