Skip to content

Commit

Permalink
Stop the tasks in parallel in AbstractKafkaConnector (linkedin#853)
Browse files Browse the repository at this point in the history
Currently during container shutdown, the tasks are shutdown sequentially in AbstractKafkaConnector. The shutdown code waits for the task threads to be killed for 60+ sec. This increases the overall stop time, if the kafka producers have a lot pending to flush in scale environment. Ideally all the tasks can be stopped in parallel, similar to when the task assignment changes.

Using the shutdown executor service to stop the tasks. When the executor is stopped, it will force kill the ongoing operations. So, I'm changing the shutdown executor time to be same as the time it waits to stop a task which is currently 75 + 15 = 90 sec.
  • Loading branch information
vmaheshw authored and Vaibhav Maheshwari committed Mar 1, 2022
1 parent 8345760 commit d513412
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public abstract class AbstractKafkaConnector implements Connector, DiagnosticsAw

private static final Duration CANCEL_TASK_TIMEOUT = Duration.ofSeconds(75);
private static final Duration POST_CANCEL_TASK_TIMEOUT = Duration.ofSeconds(15);
private static final Duration SHUTDOWN_EXECUTOR_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30);
private static final Duration SHUTDOWN_EXECUTOR_SHUTDOWN_TIMEOUT = CANCEL_TASK_TIMEOUT.plus(POST_CANCEL_TASK_TIMEOUT);
static final Duration MIN_DAEMON_THREAD_STARTUP_DELAY = Duration.ofMinutes(2);

private static final String NUM_TASK_RESTARTS = "numTaskRestarts";
Expand Down Expand Up @@ -161,7 +161,7 @@ public synchronized void onAssignmentChange(List<DatastreamTask> tasks) {
// Mark the connector task as stopped so that, in case stopping the task here fails for any reason in
// restartDeadTasks the task is not restarted
synchronized (_tasksToStop) {
toCancel.stream().forEach(task -> {
toCancel.forEach(task -> {
_tasksToStop.put(task, _runningTasks.get(task));
_runningTasks.remove(task);
});
Expand All @@ -181,7 +181,6 @@ public synchronized void onAssignmentChange(List<DatastreamTask> tasks) {
// fields/properties of the DatastreamTask (e.g. dependencies).
_runningTasks.remove(task);
_runningTasks.put(task, connectorTaskEntry);
continue; // already running
} else {
if (_tasksToStop.containsKey(task)) {
toCallRestartDeadTasks = true;
Expand Down Expand Up @@ -318,7 +317,7 @@ private void stopUnassignedTasks() {
// requires that this step completely because we call this from onAssignmentChange() (assignment thread gets
// killed if it takes too long) and restartDeadTasks which must complete quickly.
List<Future<DatastreamTask>> stopTaskFutures = _tasksToStop.keySet().stream()
.map(task -> _shutdownExecutorService.submit(() -> stopTask(task, _tasksToStop.get(task))))
.map(task -> asyncStopTask(task, _tasksToStop.get(task)))
.collect(Collectors.toList());

_shutdownExecutorService.submit(() -> {
Expand All @@ -343,6 +342,11 @@ private void stopUnassignedTasks() {
}
}

@NotNull
private Future<DatastreamTask> asyncStopTask(DatastreamTask task, ConnectorTaskEntry connectorTaskEntry) {
return _shutdownExecutorService.submit(() -> stopTask(task, connectorTaskEntry));
}

/**
* Stop the datastream task and wait for it to stop. If it has not stopped within a timeout, interrupt the thread.
* This method is expected to be idempotent.
Expand Down Expand Up @@ -398,13 +402,13 @@ protected boolean isTaskDead(ConnectorTaskEntry connectorTaskEntry) {
public void stop() {
_daemonThreadExecutorService.shutdown();
synchronized (_runningTasks) {
// Try to stop the the tasks
_runningTasks.forEach(this::stopTask);
// Try to stop the running tasks
_runningTasks.forEach(this::asyncStopTask);
_runningTasks.clear();
}
synchronized (_tasksToStop) {
// Try to stop the the tasks
_tasksToStop.forEach(this::stopTask);
// Try to stop the tasks
_tasksToStop.forEach(this::asyncStopTask);
_tasksToStop.clear();
}
_logger.info("Start to shut down the shutdown executor and wait up to {} ms.",
Expand Down

0 comments on commit d513412

Please sign in to comment.