Skip to content

Commit

Permalink
Catch exception during finalize phase
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jul 19, 2023
1 parent 5931a4a commit 96abfef
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 64 deletions.
4 changes: 3 additions & 1 deletion src/common/task_system/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ void Task::deRegisterThreadAndFinalizeTaskIfNecessary() {
lock_t lck{mtx};
++numThreadsFinished;
if (!hasExceptionNoLock() && isCompletedNoLock()) {
finalizeIfNecessary();
try {
finalizeIfNecessary();
} catch (std::exception& e) { setExceptionNoLock(std::current_exception()); }

Check warning on line 23 in src/common/task_system/task.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/task_system/task.cpp#L23

Added line #L23 was not covered by tests
}
}

Expand Down
27 changes: 0 additions & 27 deletions src/common/task_system/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ std::shared_ptr<ScheduledTask> TaskScheduler::scheduleTask(const std::shared_ptr
return scheduledTask;
}

void TaskScheduler::errorIfThereIsAnException() {
lock_t lck{mtx};
errorIfThereIsAnExceptionNoLock();
lck.unlock();
}

void TaskScheduler::errorIfThereIsAnExceptionNoLock() {
for (auto it = taskQueue.begin(); it != taskQueue.end(); ++it) {
auto task = (*it)->task;
Expand All @@ -49,19 +43,6 @@ void TaskScheduler::errorIfThereIsAnExceptionNoLock() {
}
}

void TaskScheduler::waitAllTasksToCompleteOrError() {
while (true) {
lock_t lck{mtx};
if (taskQueue.empty()) {
return;
}
errorIfThereIsAnExceptionNoLock();
lck.unlock();
std::this_thread::sleep_for(
std::chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
}
}

void TaskScheduler::scheduleTaskAndWaitOrError(
const std::shared_ptr<Task>& task, processor::ExecutionContext* context) {
for (auto& dependency : task->children) {
Expand All @@ -84,14 +65,6 @@ void TaskScheduler::scheduleTaskAndWaitOrError(
}
}

void TaskScheduler::waitUntilEnoughTasksFinish(int64_t minimumNumTasksToScheduleMore) {
while (getNumTasks() > minimumNumTasksToScheduleMore) {
errorIfThereIsAnException();
std::this_thread::sleep_for(
std::chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
}
}

std::shared_ptr<ScheduledTask> TaskScheduler::getTaskAndRegister() {
lock_t lck{mtx};
if (taskQueue.empty()) {
Expand Down
10 changes: 7 additions & 3 deletions src/include/common/task_system/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ class Task {

inline void setException(std::exception_ptr exceptionPtr) {
lock_t lck{mtx};
if (this->exceptionsPtr == nullptr) {
this->exceptionsPtr = exceptionPtr;
}
setExceptionNoLock(exceptionPtr);
}

inline bool hasException() {
Expand All @@ -88,6 +86,12 @@ class Task {

inline bool hasExceptionNoLock() const { return exceptionsPtr != nullptr; }

inline void setExceptionNoLock(std::exception_ptr exceptionPtr) {
if (exceptionsPtr == nullptr) {
exceptionsPtr = exceptionPtr;
}
}

public:
Task* parent = nullptr;
std::vector<std::shared_ptr<Task>>
Expand Down
36 changes: 3 additions & 33 deletions src/include/common/task_system/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,10 @@ struct ScheduledTask {
* is completed is removed automatically from the queue. If there is a task that raises an
* exception, the worker threads catch it and store it with the tasks. The user thread that is
* waiting on the completion of the task (or tasks) will throw the exception (the user thread could
* be waiting on a tasks through a function that waits, e.g., scheduleTaskAndWaitOrError or
* waitAllTasksToCompleteOrError.
* be waiting on a tasks through a function that waits, e.g., scheduleTaskAndWaitOrError.
*
* Currently there are two ways the TaskScheduler can be used:
* (1) Schedule a set of tasks T1, ..., Tk, and wait for all of them to be completed or error
* if any one them has raised an exception (but only the exception of one (the earliest scheduled)
* task will be raised. We do not currently re-throw multiple exceptions from multiple tasks. Ex:
* schedule(T1);
* ...;
* schedule(Tk);
* waitAllTasksToCompleteOrError();
* (2) Schedule one task T and wait for T to finish or error if there was an exception raised by
* Currently there is one way the TaskScheduler can be used:
* Schedule one task T and wait for T to finish or error if there was an exception raised by
* one of the threads working on T that errored. This is simply done by the call:
* scheduleTaskAndWaitOrError(T);
*
Expand All @@ -61,30 +53,8 @@ class TaskScheduler {
void scheduleTaskAndWaitOrError(
const std::shared_ptr<Task>& task, processor::ExecutionContext* context);

// If a user, e.g., currently the copier, adds a set of tasks T1, ..., Tk, to the task scheduler
// without waiting for them to finish, the user needs to call waitAllTasksToCompleteOrError() if
// it wants to catch the errors that may have happened in T1, ..., Tk. If this function is not
// called and a T_i errors, there will be two side effects: (1) the user will never be aware
// that there was an error in some of the scheduled tasks; (2) the erroring task will not be
// removed from the task queue, so it will remain there permanently. We only remove erroring
// tasks inside waitAllTasksToCompleteOrError and scheduleTaskAndWaitOrError. Also, see the note
// below in waitAllTasksToCompleteOrError for details of the behavior when multiple tasks fail.
std::shared_ptr<ScheduledTask> scheduleTask(const std::shared_ptr<Task>& task);

// Also note that if a user has scheduled multiple concrete tasks and calls
// waitAllTasksToCompleteOrError and multiple tasks error, then waitAllTasksToCompleteOrError
// will rethrow only the exception of the first task that it observes to have an error and
// it will remove only that one. Other tasks that may have failed many not be removed
// from the task queue and remain in the queue. So for now, use this function if you
// want the system to crash if any of the tasks fails.
void waitAllTasksToCompleteOrError();

void waitUntilEnoughTasksFinish(int64_t minimumNumTasksToScheduleMore);

// Checks if there is an erroring task in the queue and if so, errors.
void errorIfThereIsAnException();

bool isTaskQueueEmpty() { return taskQueue.empty(); }
uint64_t getNumTasks() { return taskQueue.size(); }

private:
Expand Down

0 comments on commit 96abfef

Please sign in to comment.