-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix running task data structure logic in AbstractKafkaConnector for failing to stop task #851
Conversation
...connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaConnector.java
Show resolved
Hide resolved
synchronized (_runningTasks) { | ||
toRemoveTasks.forEach(_runningTasks::remove); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Earlier we had an issue with tasks that weren't stopped properly but were removed from running tasks. Now such tasks won't be removed. Other than logging that an attempt to stop a task failed or timed out, do we have an idea on how to deal with such "zombie" tasks? If they'll be living in-memory indefinitely, can they be a source for a memory leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the "restartDeadTasks" method we check again is a given task is "dead" and if yes it will attempt to stop it again. So if its a transient error eventually it should be stopped and removed. At least this is my understanding, pls correct me if I am missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is important to understand the behavior if the thread is interrupted multiple times. Basically with this logic, the task will not be restarted if the task thread goes zombie and it will wait for the task thread to die.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non-transient errors, we will never be able to start the task to replace this dead task which isn't able to stop. This can lead to stopped partitions problems and will be very hard to debug and RC.
Please ensure that you add a metric to track when we are unable to stop a task so what we can quickly check for this metric when we run into issues. Can you perhaps also add a comment explaining that this can lead to stopped partitions issue?
Also, please ensure that we fix pieces of code that don't honor the interrupt signal in the future so that we can minimize this scenario. @surajkn and I don't have the full context on what kind of exceptions are seen such that we are unable to stop tasks. We are also not aware of how often his happens, so any insights from @vmaheshw or @jzakaryan will be helpful here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, this change will not work because the task to be removed is still part of runningTasks and if the restartDeadTasks logic will attempt to restart this task thread.
if (toCancel.size() > 0) { | ||
List<Future<DatastreamTask>> stopTaskFutures = toCancel.stream() | ||
.map(task -> _shutdownExecutorService.submit(() -> stopTask(task, _runningTasks.get(task)))) | ||
.filter(Objects::nonNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to filter nonNull
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required. Removed
stopTask(task, connectorTaskEntry); | ||
List<DatastreamTask> toRemoveTasks = stopTaskFutures.stream().map(stopTaskFuture -> { | ||
try { | ||
return stopTaskFuture.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be a timeout in the get call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
synchronized (_runningTasks) { | ||
toRemoveTasks.forEach(_runningTasks::remove); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is important to understand the behavior if the thread is interrupted multiple times. Basically with this logic, the task will not be restarted if the task thread goes zombie and it will wait for the task thread to die.
try { | ||
return stopTaskFuture.get(); | ||
} catch (ExecutionException | InterruptedException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use Logger to print the exception stack trace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -256,7 +276,7 @@ protected void restartDeadTasks() { | |||
* @param connectorTaskEntry connectorTaskEntry corresponding to the datastream task. | |||
* @return true if it was successfully stopped, false if it was not. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the return javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return null; | ||
}).filter(Objects::nonNull).collect(Collectors.toList()); | ||
|
||
if (toRemoveTasks.size() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a problem with this change.
For the task to be removed, if it did not stop even after InterruptedException, the task object will be part of runningTasks and deadRestartTask will attempt to start the task again. HandleAssignmentChange will not be requeued. So, the deadRestartTask
will not have any information that this task is not supposed to be start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this clarification. I introduced another datastructure "toStopTasks" to keep track of tasks that need to be stopped because they have been unassigned. Basically what is happening here is assignment is getting changed as part of that some tasks may be unassigned and those tasks need to be stopped. So even though stopping such tasks may fail they still should not be part of "runningTasks".
Now as part of this change, unassigned tasks will be added to "toStopTasks" and we will attempt to periodically stop any unstopped tasks. (Currently piggy-backing on periodic execution of "restartDeadTasks" for the same)
508068f
to
5559569
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@surajkn Please also add this as a potential STOPPED partition scenario to the BMM debugging runbook and point to the relevant metrics once they are added. Also ensure this is well tested as we discussed since this is a very critical piece of code.
// multiple concurrent threads. If its required to access both of them together then the order of synchronizing | ||
// should be _runningTasks first and then _toStopTasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can you reword the second sentence a bit:
"If access is required to both maps then the order of synchronization must be _runningTasks followed by _toStopTasks to prevent deadlocks."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private final Map<DatastreamTask, ConnectorTaskEntry> _runningTasks = new HashMap<>(); | ||
private final Map<DatastreamTask, ConnectorTaskEntry> _toStopTasks = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to _tasksToStop
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// This is the only place where we synchronize on both _toStopTasks and _runningTasks. But if such a case | ||
// arises elsewhere the order of synchronization should be _runningTasks and then _toStopTasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the previous code block you are also accessing the lock for _toStopTasks
, so this comment should be moved up and modified to talk about accessing both in this function.
Is it really necessary to leave a comment here since you've already left a comment where these are defined? In the future if someone adds another block where both are accessed, they may miss updating/removing this comment.
also nit to reword in case you decide to keep it:
"We only synchronize both _runningTasks and _toStopTasks in this function. If these need to be synchronized together in other parts of the code, please follow the correct lock ordering semantics."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats a good point. It might be misleading if someone forgets to update/remove this comment if another such case comes up. Removed this comment.
synchronized (_runningTasks) { | ||
toRemoveTasks.forEach(_runningTasks::remove); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For non-transient errors, we will never be able to start the task to replace this dead task which isn't able to stop. This can lead to stopped partitions problems and will be very hard to debug and RC.
Please ensure that you add a metric to track when we are unable to stop a task so what we can quickly check for this metric when we run into issues. Can you perhaps also add a comment explaining that this can lead to stopped partitions issue?
Also, please ensure that we fix pieces of code that don't honor the interrupt signal in the future so that we can minimize this scenario. @surajkn and I don't have the full context on what kind of exceptions are seen such that we are unable to stop tasks. We are also not aware of how often his happens, so any insights from @vmaheshw or @jzakaryan will be helpful here.
_runningTasks.put(task, createKafkaConnectorTask(task)); | ||
} | ||
// If any tasks pending stop were re-assigned to this host we explicitly call restartDeadTasks to ensure | ||
// stop and (re)start of the task. We do this to not wait for next periodic invocation of restartDeadTasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: reword: "We do this to avoid waiting for the next periodic invocation of restartDeadTasks."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
/** | ||
* Returns the number of tasks yet to be stopped. | ||
*/ | ||
int getToStopTaskCount() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename getTasksToStopCount() if you changed the variable name from _toStopTasks to _tasksToStop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
/** | ||
* Returns the number of currently running tasks. | ||
*/ | ||
int getRunningTaskCount() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename: getRunningTasksCount()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -77,6 +81,8 @@ | |||
|
|||
private static final Duration CANCEL_TASK_TIMEOUT = Duration.ofSeconds(75); | |||
private static final Duration POST_CANCEL_TASK_TIMEOUT = Duration.ofSeconds(15); | |||
// This should be at least equal to or greater than sum of cancel and post cancel timeouts | |||
private static final Duration CANCEL_TASK_THREAD_TIMEOUT = Duration.ofSeconds(90); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make variables for 75
and 15
, and add them here? That'll be the best way to enforce what you're intending with your comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this new variable and instead now I rely on explicit sum of other two. Thought that would be better to enforce it than a comment.
|
||
if (toRemoveTasks.size() > 0) { | ||
synchronized (_toStopTasks) { | ||
// Its possible that while stopping the task was pending there was another OnAssignmentChange event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: OnAssignmentChange -> onAssignmentChange
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -271,15 +367,16 @@ private boolean stopTask(DatastreamTask datastreamTask, ConnectorTaskEntry conne | |||
if (!connectorTask.awaitStop(POST_CANCEL_TASK_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { | |||
_logger.warn("Connector task for datastream task {} did not stop even after {} ms.", datastreamTask, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a metric to track how often we are unable to stop tasks within the two cancel timeouts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a counter to track the number of times task stop fails. Pls suggest if this is not the correct metric type.
5559569
to
f76ad48
Compare
…ailing to stop task There are two issues with the existing logic around how _runningTasks structure is managed. 1. In onAssignmentChange we prematurely remove task from running task structure without checking if stopTask succeeded or not. 2. In restartDeadTasks we do not check if the task was stopped successfully or not, and create and start a new task irrespectively. And since the task acquire acts as a re-entrant lock, for a given pair of host and datastream task, we can potentially end up with two tasks running in parallel. This patch fixes it by introducing "toStopTasks" structure which keeps track of all unassigned tasks that need to be stopped and should not be restarted. Now running tasks contains only those tasks that are healthy or have gone dead but need to be restarted. Following are the scenarios/interactions to consider 1) An onAssignmentChange event results in assignment change and as a result some tasks need to be stopped. These tasks should not be restarted. 2) Some task threads have gone dead but the task assignment has not changed and these tasks need to be restarted by stopping the current task thread and creating and starting another task thread instead. Its possible that stopping the task may fail and we must keep retrying to stop the task before starting another thread. 3) In rare case its possible that while a task is being stopped, because of being unassigned, its get re-assigned back to the same host. Even in this case we must make sure to stop the current task thread first and then start another thread.
f76ad48
to
e3fd785
Compare
@@ -75,12 +77,72 @@ public void testConnectorRestartCalled() { | |||
connector.stop(); | |||
} | |||
|
|||
@Test | |||
public void testOnAssignmentChangeReassignment() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it be possible to add a test for the new metric you added to ensure it gets incremented when a task stop fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a check for that in the test "testOnAssignmentChangeStopTaskFailure" below at line 133, I check for the metric count to be non-zero. Is that what you were suggesting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a minor comment about adding a potential test case for the new metric. Please do ensure that you have captured the expected stopped partitions issue in the BMM debugging guide (and let the SREs know as well) so that people know that they should look for that as a potential RC.
…ailing to stop task (linkedin#851) There are two issues with the existing logic around how _runningTasks structure is managed. 1. In onAssignmentChange we prematurely remove task from running task structure without checking if stopTask succeeded or not. 2. In restartDeadTasks we do not check if the task was stopped successfully or not, and create and start a new task irrespectively. And since the task acquire acts as a re-entrant lock, for a given pair of host and datastream task, we can potentially end up with two tasks running in parallel. This patch fixes it by introducing "toStopTasks" structure which keeps track of all unassigned tasks that need to be stopped and should not be restarted. Now running tasks contains only those tasks that are healthy or have gone dead but need to be restarted. Following are the scenarios/interactions to consider 1) An onAssignmentChange event results in assignment change and as a result some tasks need to be stopped. These tasks should not be restarted. 2) Some task threads have gone dead but the task assignment has not changed and these tasks need to be restarted by stopping the current task thread and creating and starting another task thread instead. Its possible that stopping the task may fail and we must keep retrying to stop the task before starting another thread. 3) In rare case its possible that while a task is being stopped, because of being unassigned, its get re-assigned back to the same host. Even in this case we must make sure to stop the current task thread first and then start another thread.
There are two issues with the existing logic around how _runningTasks structure is managed.
stopTask succeeded or not.
start a new task irrespectively. And since the task acquire acts as a re-entrant lock, for a given
pair of host and datastream task, we can potentially end up with two tasks running in parallel.
This patch fixes it by introducing "toStopTasks" structure which keeps track of all unassigned tasks
that need to be stopped and should not be restarted. Now running tasks contains only those tasks
that are healthy or have gone dead but need to be restarted.
Following are the scenarios/interactions to consider
stopped. These tasks should not be restarted.
be restarted by stopping the current task thread and creating and starting another task thread
instead. Its possible that stopping the task may fail and we must keep retrying to stop the task
before starting another thread.
get re-assigned back to the same host. Even in this case we must make sure to stop the current task
thread first and then start another thread.
Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.
Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md