Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix running task data structure logic in AbstractKafkaConnector for failing to stop task #851

Merged
merged 1 commit into from
Sep 23, 2021

Conversation

surajkn
Copy link
Collaborator

@surajkn surajkn commented Aug 24, 2021

There are two issues with the existing logic around how _runningTasks structure is managed.

  1. In onAssignmentChange we prematurely remove task from running task structure without checking if
    stopTask succeeded or not.
  2. In restartDeadTasks we do not check if the task was stopped successfully or not, and create and
    start a new task irrespectively. And since the task acquire acts as a re-entrant lock, for a given
    pair of host and datastream task, we can potentially end up with two tasks running in parallel.

This patch fixes it by introducing "toStopTasks" structure which keeps track of all unassigned tasks
that need to be stopped and should not be restarted. Now running tasks contains only those tasks
that are healthy or have gone dead but need to be restarted.

Following are the scenarios/interactions to consider

  1. An onAssignmentChange event results in assignment change and as a result some tasks need to be
    stopped. These tasks should not be restarted.
  2. Some task threads have gone dead but the task assignment has not changed and these tasks need to
    be restarted by stopping the current task thread and creating and starting another task thread
    instead. Its possible that stopping the task may fail and we must keep retrying to stop the task
    before starting another thread.
  3. In rare case its possible that while a task is being stopped, because of being unassigned, its
    get re-assigned back to the same host. Even in this case we must make sure to stop the current task
    thread first and then start another thread.

Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.

Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md

synchronized (_runningTasks) {
toRemoveTasks.forEach(_runningTasks::remove);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier we had an issue with tasks that weren't stopped properly but were removed from running tasks. Now such tasks won't be removed. Other than logging that an attempt to stop a task failed or timed out, do we have an idea on how to deal with such "zombie" tasks? If they'll be living in-memory indefinitely, can they be a source for a memory leak?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the "restartDeadTasks" method we check again is a given task is "dead" and if yes it will attempt to stop it again. So if its a transient error eventually it should be stopped and removed. At least this is my understanding, pls correct me if I am missing something.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is important to understand the behavior if the thread is interrupted multiple times. Basically with this logic, the task will not be restarted if the task thread goes zombie and it will wait for the task thread to die.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-transient errors, we will never be able to start the task to replace this dead task which isn't able to stop. This can lead to stopped partitions problems and will be very hard to debug and RC.

Please ensure that you add a metric to track when we are unable to stop a task so what we can quickly check for this metric when we run into issues. Can you perhaps also add a comment explaining that this can lead to stopped partitions issue?

Also, please ensure that we fix pieces of code that don't honor the interrupt signal in the future so that we can minimize this scenario. @surajkn and I don't have the full context on what kind of exceptions are seen such that we are unable to stop tasks. We are also not aware of how often his happens, so any insights from @vmaheshw or @jzakaryan will be helpful here.

Copy link
Collaborator

@vmaheshw vmaheshw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, this change will not work because the task to be removed is still part of runningTasks and if the restartDeadTasks logic will attempt to restart this task thread.

if (toCancel.size() > 0) {
List<Future<DatastreamTask>> stopTaskFutures = toCancel.stream()
.map(task -> _shutdownExecutorService.submit(() -> stopTask(task, _runningTasks.get(task))))
.filter(Objects::nonNull)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to filter nonNull

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required. Removed

stopTask(task, connectorTaskEntry);
List<DatastreamTask> toRemoveTasks = stopTaskFutures.stream().map(stopTaskFuture -> {
try {
return stopTaskFuture.get();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a timeout in the get call

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

synchronized (_runningTasks) {
toRemoveTasks.forEach(_runningTasks::remove);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is important to understand the behavior if the thread is interrupted multiple times. Basically with this logic, the task will not be restarted if the task thread goes zombie and it will wait for the task thread to die.

try {
return stopTaskFuture.get();
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Logger to print the exception stack trace.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -256,7 +276,7 @@ protected void restartDeadTasks() {
* @param connectorTaskEntry connectorTaskEntry corresponding to the datastream task.
* @return true if it was successfully stopped, false if it was not.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the return javadoc

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return null;
}).filter(Objects::nonNull).collect(Collectors.toList());

if (toRemoveTasks.size() > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a problem with this change.

For the task to be removed, if it did not stop even after InterruptedException, the task object will be part of runningTasks and deadRestartTask will attempt to start the task again. HandleAssignmentChange will not be requeued. So, the deadRestartTask will not have any information that this task is not supposed to be start.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this clarification. I introduced another datastructure "toStopTasks" to keep track of tasks that need to be stopped because they have been unassigned. Basically what is happening here is assignment is getting changed as part of that some tasks may be unassigned and those tasks need to be stopped. So even though stopping such tasks may fail they still should not be part of "runningTasks".
Now as part of this change, unassigned tasks will be added to "toStopTasks" and we will attempt to periodically stop any unstopped tasks. (Currently piggy-backing on periodic execution of "restartDeadTasks" for the same)

@surajkn surajkn force-pushed the restart_dead_tasks branch 2 times, most recently from 508068f to 5559569 Compare August 31, 2021 01:01
Copy link
Collaborator

@somandal somandal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@surajkn Please also add this as a potential STOPPED partition scenario to the BMM debugging runbook and point to the relevant metrics once they are added. Also ensure this is well tested as we discussed since this is a very critical piece of code.

Comment on lines 102 to 103
// multiple concurrent threads. If its required to access both of them together then the order of synchronizing
// should be _runningTasks first and then _toStopTasks.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you reword the second sentence a bit:
"If access is required to both maps then the order of synchronization must be _runningTasks followed by _toStopTasks to prevent deadlocks."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private final Map<DatastreamTask, ConnectorTaskEntry> _runningTasks = new HashMap<>();
private final Map<DatastreamTask, ConnectorTaskEntry> _toStopTasks = new HashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to _tasksToStop instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 172 to 173
// This is the only place where we synchronize on both _toStopTasks and _runningTasks. But if such a case
// arises elsewhere the order of synchronization should be _runningTasks and then _toStopTasks.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous code block you are also accessing the lock for _toStopTasks, so this comment should be moved up and modified to talk about accessing both in this function.
Is it really necessary to leave a comment here since you've already left a comment where these are defined? In the future if someone adds another block where both are accessed, they may miss updating/removing this comment.

also nit to reword in case you decide to keep it:
"We only synchronize both _runningTasks and _toStopTasks in this function. If these need to be synchronized together in other parts of the code, please follow the correct lock ordering semantics."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good point. It might be misleading if someone forgets to update/remove this comment if another such case comes up. Removed this comment.

synchronized (_runningTasks) {
toRemoveTasks.forEach(_runningTasks::remove);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-transient errors, we will never be able to start the task to replace this dead task which isn't able to stop. This can lead to stopped partitions problems and will be very hard to debug and RC.

Please ensure that you add a metric to track when we are unable to stop a task so what we can quickly check for this metric when we run into issues. Can you perhaps also add a comment explaining that this can lead to stopped partitions issue?

Also, please ensure that we fix pieces of code that don't honor the interrupt signal in the future so that we can minimize this scenario. @surajkn and I don't have the full context on what kind of exceptions are seen such that we are unable to stop tasks. We are also not aware of how often his happens, so any insights from @vmaheshw or @jzakaryan will be helpful here.

_runningTasks.put(task, createKafkaConnectorTask(task));
}
// If any tasks pending stop were re-assigned to this host we explicitly call restartDeadTasks to ensure
// stop and (re)start of the task. We do this to not wait for next periodic invocation of restartDeadTasks.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reword: "We do this to avoid waiting for the next periodic invocation of restartDeadTasks."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Returns the number of tasks yet to be stopped.
*/
int getToStopTaskCount() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename getTasksToStopCount() if you changed the variable name from _toStopTasks to _tasksToStop

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Returns the number of currently running tasks.
*/
int getRunningTaskCount() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename: getRunningTasksCount()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -77,6 +81,8 @@

private static final Duration CANCEL_TASK_TIMEOUT = Duration.ofSeconds(75);
private static final Duration POST_CANCEL_TASK_TIMEOUT = Duration.ofSeconds(15);
// This should be at least equal to or greater than sum of cancel and post cancel timeouts
private static final Duration CANCEL_TASK_THREAD_TIMEOUT = Duration.ofSeconds(90);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make variables for 75 and 15, and add them here? That'll be the best way to enforce what you're intending with your comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this new variable and instead now I rely on explicit sum of other two. Thought that would be better to enforce it than a comment.


if (toRemoveTasks.size() > 0) {
synchronized (_toStopTasks) {
// Its possible that while stopping the task was pending there was another OnAssignmentChange event
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: OnAssignmentChange -> onAssignmentChange

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -271,15 +367,16 @@ private boolean stopTask(DatastreamTask datastreamTask, ConnectorTaskEntry conne
if (!connectorTask.awaitStop(POST_CANCEL_TASK_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
_logger.warn("Connector task for datastream task {} did not stop even after {} ms.", datastreamTask,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a metric to track how often we are unable to stop tasks within the two cancel timeouts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a counter to track the number of times task stop fails. Pls suggest if this is not the correct metric type.

…ailing to stop task

There are two issues with the existing logic around how _runningTasks structure is managed.
1. In onAssignmentChange we prematurely remove task from running task structure without checking if
stopTask succeeded or not.
2. In restartDeadTasks we do not check if the task was stopped successfully or not, and create and
start a new task irrespectively. And since the task acquire acts as a re-entrant lock, for a given
pair of host and datastream task, we can potentially end up with two tasks running in parallel.
This patch fixes it by introducing "toStopTasks" structure which keeps track of all unassigned tasks
that need to be stopped and should not be restarted. Now running tasks contains only those tasks
that are healthy or have gone dead but need to be restarted.
Following are the scenarios/interactions to consider
1) An onAssignmentChange event results in assignment change and as a result some tasks need to be
stopped. These tasks should not be restarted.
2) Some task threads have gone dead but the task assignment has not changed and these tasks need to
be restarted by stopping the current task thread and creating and starting another task thread
instead. Its possible that stopping the task may fail and we must keep retrying to stop the task
before starting another thread.
3) In rare case its possible that while a task is being stopped, because of being unassigned, its
get re-assigned back to the same host. Even in this case we must make sure to stop the current task
thread first and then start another thread.
@@ -75,12 +77,72 @@ public void testConnectorRestartCalled() {
connector.stop();
}

@Test
public void testOnAssignmentChangeReassignment() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be possible to add a test for the new metric you added to ensure it gets incremented when a task stop fails?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check for that in the test "testOnAssignmentChangeStopTaskFailure" below at line 133, I check for the metric count to be non-zero. Is that what you were suggesting?

Copy link
Collaborator

@somandal somandal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a minor comment about adding a potential test case for the new metric. Please do ensure that you have captured the expected stopped partitions issue in the BMM debugging guide (and let the SREs know as well) so that people know that they should look for that as a potential RC.

@surajkn surajkn dismissed vmaheshw’s stale review September 20, 2021 20:11

Addressed this review comment

@jzakaryan jzakaryan self-requested a review September 21, 2021 23:29
@surajkn surajkn merged commit ac0523f into linkedin:master Sep 23, 2021
@surajkn surajkn deleted the restart_dead_tasks branch September 23, 2021 01:05
vmaheshw pushed a commit to vmaheshw/brooklin that referenced this pull request Mar 1, 2022
…ailing to stop task (linkedin#851)

There are two issues with the existing logic around how _runningTasks structure is managed.
1. In onAssignmentChange we prematurely remove task from running task structure without checking if
stopTask succeeded or not.
2. In restartDeadTasks we do not check if the task was stopped successfully or not, and create and
start a new task irrespectively. And since the task acquire acts as a re-entrant lock, for a given
pair of host and datastream task, we can potentially end up with two tasks running in parallel.
This patch fixes it by introducing "toStopTasks" structure which keeps track of all unassigned tasks
that need to be stopped and should not be restarted. Now running tasks contains only those tasks
that are healthy or have gone dead but need to be restarted.
Following are the scenarios/interactions to consider
1) An onAssignmentChange event results in assignment change and as a result some tasks need to be
stopped. These tasks should not be restarted.
2) Some task threads have gone dead but the task assignment has not changed and these tasks need to
be restarted by stopping the current task thread and creating and starting another task thread
instead. Its possible that stopping the task may fail and we must keep retrying to stop the task
before starting another thread.
3) In rare case its possible that while a task is being stopped, because of being unassigned, its
get re-assigned back to the same host. Even in this case we must make sure to stop the current task
thread first and then start another thread.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants