Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop the tasks in parallel in AbstractKafkaConnector #853

Merged
merged 2 commits into from
Sep 28, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public abstract class AbstractKafkaConnector implements Connector, DiagnosticsAw

private static final Duration CANCEL_TASK_TIMEOUT = Duration.ofSeconds(75);
private static final Duration POST_CANCEL_TASK_TIMEOUT = Duration.ofSeconds(15);
private static final Duration SHUTDOWN_EXECUTOR_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30);
private static final Duration SHUTDOWN_EXECUTOR_SHUTDOWN_TIMEOUT = CANCEL_TASK_TIMEOUT.plus(POST_CANCEL_TASK_TIMEOUT);
static final Duration MIN_DAEMON_THREAD_STARTUP_DELAY = Duration.ofMinutes(2);

private static final String NUM_TASK_RESTARTS = "numTaskRestarts";
Expand Down Expand Up @@ -161,7 +161,7 @@ public synchronized void onAssignmentChange(List<DatastreamTask> tasks) {
// Mark the connector task as stopped so that, in case stopping the task here fails for any reason in
// restartDeadTasks the task is not restarted
synchronized (_tasksToStop) {
toCancel.stream().forEach(task -> {
toCancel.forEach(task -> {
_tasksToStop.put(task, _runningTasks.get(task));
_runningTasks.remove(task);
});
Expand All @@ -181,7 +181,6 @@ public synchronized void onAssignmentChange(List<DatastreamTask> tasks) {
// fields/properties of the DatastreamTask (e.g. dependencies).
_runningTasks.remove(task);
_runningTasks.put(task, connectorTaskEntry);
continue; // already running
} else {
if (_tasksToStop.containsKey(task)) {
toCallRestartDeadTasks = true;
Expand Down Expand Up @@ -318,7 +317,7 @@ private void stopUnassignedTasks() {
// requires that this step completely because we call this from onAssignmentChange() (assignment thread gets
// killed if it takes too long) and restartDeadTasks which must complete quickly.
List<Future<DatastreamTask>> stopTaskFutures = _tasksToStop.keySet().stream()
.map(task -> _shutdownExecutorService.submit(() -> stopTask(task, _tasksToStop.get(task))))
.map(task -> asyncStopTask(task, _tasksToStop.get(task)))
.collect(Collectors.toList());

_shutdownExecutorService.submit(() -> {
Expand All @@ -343,6 +342,11 @@ private void stopUnassignedTasks() {
}
}

@NotNull
private Future<DatastreamTask> asyncStopTask(DatastreamTask task, ConnectorTaskEntry connectorTaskEntry) {
return _shutdownExecutorService.submit(() -> stopTask(task, connectorTaskEntry));
}

/**
* Stop the datastream task and wait for it to stop. If it has not stopped within a timeout, interrupt the thread.
* This method is expected to be idempotent.
Expand Down Expand Up @@ -398,13 +402,13 @@ protected boolean isTaskDead(ConnectorTaskEntry connectorTaskEntry) {
public void stop() {
_daemonThreadExecutorService.shutdown();
synchronized (_runningTasks) {
// Try to stop the the tasks
_runningTasks.forEach(this::stopTask);
// Try to stop the running tasks
_runningTasks.forEach(this::asyncStopTask);
_runningTasks.clear();
}
synchronized (_tasksToStop) {
// Try to stop the the tasks
_tasksToStop.forEach(this::stopTask);
// Try to stop the tasks
_tasksToStop.forEach(this::asyncStopTask);
_tasksToStop.clear();
}
_logger.info("Start to shut down the shutdown executor and wait up to {} ms.",
Expand Down