Skip to content

Commit

Permalink
Catch exception during finalize phase
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jul 19, 2023
1 parent 5931a4a commit d31cba0
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 85 deletions.
4 changes: 3 additions & 1 deletion src/common/task_system/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ void Task::deRegisterThreadAndFinalizeTaskIfNecessary() {
lock_t lck{mtx};
++numThreadsFinished;
if (!hasExceptionNoLock() && isCompletedNoLock()) {
finalizeIfNecessary();
try {
finalizeIfNecessary();
} catch (std::exception& e) { setExceptionNoLock(std::current_exception()); }

Check warning on line 23 in src/common/task_system/task.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/task_system/task.cpp#L23

Added line #L23 was not covered by tests
}
}

Expand Down
44 changes: 1 addition & 43 deletions src/common/task_system/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ using namespace kuzu::common;
namespace kuzu {
namespace common {

TaskScheduler::TaskScheduler(uint64_t numThreads)
: logger{LoggerUtils::getLogger(LoggerConstants::LoggerEnum::PROCESSOR)}, nextScheduledTaskID{
0} {
TaskScheduler::TaskScheduler(uint64_t numThreads) : nextScheduledTaskID{0} {
for (auto n = 0u; n < numThreads; ++n) {
threads.emplace_back([&] { runWorkerThread(); });
}
Expand All @@ -30,38 +28,6 @@ std::shared_ptr<ScheduledTask> TaskScheduler::scheduleTask(const std::shared_ptr
return scheduledTask;
}

void TaskScheduler::errorIfThereIsAnException() {
lock_t lck{mtx};
errorIfThereIsAnExceptionNoLock();
lck.unlock();
}

void TaskScheduler::errorIfThereIsAnExceptionNoLock() {
for (auto it = taskQueue.begin(); it != taskQueue.end(); ++it) {
auto task = (*it)->task;
if (task->hasException()) {
taskQueue.erase(it);
std::rethrow_exception(task->getExceptionPtr());
}
// TODO(Semih): We can optimize to stop after finding a registrable task. This is
// because tasks after the first registrable task in the queue cannot have any thread
// yet registered to them, so they cannot have errored.
}
}

void TaskScheduler::waitAllTasksToCompleteOrError() {
while (true) {
lock_t lck{mtx};
if (taskQueue.empty()) {
return;
}
errorIfThereIsAnExceptionNoLock();
lck.unlock();
std::this_thread::sleep_for(
std::chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
}
}

void TaskScheduler::scheduleTaskAndWaitOrError(
const std::shared_ptr<Task>& task, processor::ExecutionContext* context) {
for (auto& dependency : task->children) {
Expand All @@ -84,14 +50,6 @@ void TaskScheduler::scheduleTaskAndWaitOrError(
}
}

void TaskScheduler::waitUntilEnoughTasksFinish(int64_t minimumNumTasksToScheduleMore) {
while (getNumTasks() > minimumNumTasksToScheduleMore) {
errorIfThereIsAnException();
std::this_thread::sleep_for(
std::chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
}
}

std::shared_ptr<ScheduledTask> TaskScheduler::getTaskAndRegister() {
lock_t lck{mtx};
if (taskQueue.empty()) {
Expand Down
10 changes: 7 additions & 3 deletions src/include/common/task_system/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ class Task {

inline void setException(std::exception_ptr exceptionPtr) {
lock_t lck{mtx};
if (this->exceptionsPtr == nullptr) {
this->exceptionsPtr = exceptionPtr;
}
setExceptionNoLock(exceptionPtr);
}

inline bool hasException() {
Expand All @@ -88,6 +86,12 @@ class Task {

inline bool hasExceptionNoLock() const { return exceptionsPtr != nullptr; }

inline void setExceptionNoLock(std::exception_ptr exceptionPtr) {
if (exceptionsPtr == nullptr) {
exceptionsPtr = exceptionPtr;
}
}

public:
Task* parent = nullptr;
std::vector<std::shared_ptr<Task>>
Expand Down
41 changes: 3 additions & 38 deletions src/include/common/task_system/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,10 @@ struct ScheduledTask {
* is completed is removed automatically from the queue. If there is a task that raises an
* exception, the worker threads catch it and store it with the tasks. The user thread that is
* waiting on the completion of the task (or tasks) will throw the exception (the user thread could
* be waiting on a tasks through a function that waits, e.g., scheduleTaskAndWaitOrError or
* waitAllTasksToCompleteOrError.
* be waiting on a tasks through a function that waits, e.g., scheduleTaskAndWaitOrError.
*
* Currently there are two ways the TaskScheduler can be used:
* (1) Schedule a set of tasks T1, ..., Tk, and wait for all of them to be completed or error
* if any one them has raised an exception (but only the exception of one (the earliest scheduled)
* task will be raised. We do not currently re-throw multiple exceptions from multiple tasks. Ex:
* schedule(T1);
* ...;
* schedule(Tk);
* waitAllTasksToCompleteOrError();
* (2) Schedule one task T and wait for T to finish or error if there was an exception raised by
* Currently there is one way the TaskScheduler can be used:
* Schedule one task T and wait for T to finish or error if there was an exception raised by
* one of the threads working on T that errored. This is simply done by the call:
* scheduleTaskAndWaitOrError(T);
*
Expand All @@ -61,45 +53,18 @@ class TaskScheduler {
void scheduleTaskAndWaitOrError(
const std::shared_ptr<Task>& task, processor::ExecutionContext* context);

// If a user, e.g., currently the copier, adds a set of tasks T1, ..., Tk, to the task scheduler
// without waiting for them to finish, the user needs to call waitAllTasksToCompleteOrError() if
// it wants to catch the errors that may have happened in T1, ..., Tk. If this function is not
// called and a T_i errors, there will be two side effects: (1) the user will never be aware
// that there was an error in some of the scheduled tasks; (2) the erroring task will not be
// removed from the task queue, so it will remain there permanently. We only remove erroring
// tasks inside waitAllTasksToCompleteOrError and scheduleTaskAndWaitOrError. Also, see the note
// below in waitAllTasksToCompleteOrError for details of the behavior when multiple tasks fail.
std::shared_ptr<ScheduledTask> scheduleTask(const std::shared_ptr<Task>& task);

// Also note that if a user has scheduled multiple concrete tasks and calls
// waitAllTasksToCompleteOrError and multiple tasks error, then waitAllTasksToCompleteOrError
// will rethrow only the exception of the first task that it observes to have an error and
// it will remove only that one. Other tasks that may have failed many not be removed
// from the task queue and remain in the queue. So for now, use this function if you
// want the system to crash if any of the tasks fails.
void waitAllTasksToCompleteOrError();

void waitUntilEnoughTasksFinish(int64_t minimumNumTasksToScheduleMore);

// Checks if there is an erroring task in the queue and if so, errors.
void errorIfThereIsAnException();

bool isTaskQueueEmpty() { return taskQueue.empty(); }
uint64_t getNumTasks() { return taskQueue.size(); }

private:
void removeErroringTask(uint64_t scheduledTaskID);

void errorIfThereIsAnExceptionNoLock();

// Functions to launch worker threads and for the worker threads to use to grab task from queue.
void runWorkerThread();
std::shared_ptr<ScheduledTask> getTaskAndRegister();

void interruptTaskIfTimeOutNoLock(processor::ExecutionContext* context);

private:
std::shared_ptr<spdlog::logger> logger;
std::mutex mtx;
std::deque<std::shared_ptr<ScheduledTask>> taskQueue;
std::atomic<bool> stopThreads{false};
Expand Down

0 comments on commit d31cba0

Please sign in to comment.