Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore tasks when lifecycle start #14909

Merged
merged 7 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -60,7 +61,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -309,23 +309,23 @@ public Optional<InputStream> streamTaskReports(String taskid) throws IOException
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
List<Pair<Task, ListenableFuture<TaskStatus>>> restoredTasks = new ArrayList<>();
return ImmutableList.of();
}

@Override
@LifecycleStart
public void start()
{
for (Job job : client.getPeonJobs()) {
try {
Task task = adapter.toTask(job);
restoredTasks.add(Pair.of(task, joinAsync(task)));
joinAsync(adapter.toTask(job));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really fix the bug? It looks like after this change, once start() completes we can be sure that tasks has all the right task IDs in it, but we can't be sure that it has the most up-to-date statuses. The statuses are still being restored asynchronously by joinAsync, and that could be happening in the background after start() exists.

Could you please consider this, and determine if it's OK or not? If it's OK please add a comment here explaining why, so future readers don't need to wonder if the async restoration is OK. If it's not OK, then please update the code to wait for the most up to date statuses to be loaded before returning from start().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the behavior we saw was this change made task failures during rollover less common (probably because there's still a race condition here). i think we should look at how the HttpRemoteTaskRunner solves this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm

I tried to add some code in the start logic of KubernetesTaskRunner to check the tasks that have completed before the overlord came up and wait on their futures to complete, but this didn't actually solve the problem.

I think this is because the callbacks that are responsible for listening to the taskRunner future and updating the task's status in TaskStorage are added by the TaskQueue manageInternalCritical block, which doesn't get run during LIfeycleStart.

I think this may actually be a general race condition-y thing with supervisors that mm-less ingestion may be running into more often. Will need more time to figure out what's happening but for now I think this PR can be merged because the logic of list all jobs in k8s -> add them to the tasks map definitely belongs in the start method and not the restore method. (the restore method appears to be called in every manage() run in TaskQueue).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As George mentioned, this change has reduced the frequency of task failures during rollovers. To comprehensively address the issue, we might consider persisting the status of SeekableStreamIndexTaskRunner in the database. This would allow for accurate restoration upon startup, so don't need to rely on TaskRunner for the latest task statuses when start. However, this enhancement will be tackled in a subsequent PR.
(we've also observed similar symptoms with Middle Manager streaming ingestion)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think this change fixes the issue described in the PR description (task failures during rollovers). it still seems like a good idea because restore() gets called a lot by TaskQueue.

@YongGang i might change the description of the PR since this isn't targeted at fixing the bugs you were seeing anymore.

}
catch (IOException e) {
log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName());
}
}
return restoredTasks;
}
log.info("Loaded %,d tasks from previous run", tasks.size());

@Override
@LifecycleStart
public void start()
{
cleanupExecutor.scheduleAtFixedRate(
() ->
client.deleteCompletedPeonJobsOlderThan(
Expand All @@ -339,7 +339,6 @@ public void start()
log.debug("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay());
}


@Override
@LifecycleStop
public void stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.http.client.HttpClient;
Expand All @@ -56,7 +55,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -101,6 +99,84 @@ public void setup()
);
}

@Test
public void test_start_withExistingJobs() throws IOException
{
KubernetesTaskRunner runner = new KubernetesTaskRunner(
taskAdapter,
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter
)
{
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null))
.getResult();
}
};

Job job = new JobBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.build();

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);

replayAll();

runner.start();

verifyAll();

Assert.assertNotNull(runner.tasks);
Assert.assertEquals(1, runner.tasks.size());
}

@Test
public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOException
{
KubernetesTaskRunner runner = new KubernetesTaskRunner(
taskAdapter,
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter
)
{
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null))
.getResult();
}
};

Job job = new JobBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.build();

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException());

replayAll();

runner.start();

verifyAll();

Assert.assertNotNull(runner.tasks);
Assert.assertEquals(0, runner.tasks.size());
}

@Test
public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional()
{
Expand Down Expand Up @@ -263,80 +339,6 @@ public void test_shutdown_withoutExistingTask()
runner.shutdown(task.getId(), "");
}

@Test
public void test_restore_withExistingJobs() throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to delete all these tests again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the missing one back.

{
KubernetesTaskRunner runner = new KubernetesTaskRunner(
taskAdapter,
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter
) {
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
return new KubernetesWorkItem(task, null).getResult();
}
};

Job job = new JobBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.build();

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);

replayAll();

List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = runner.restore();

verifyAll();

Assert.assertNotNull(tasks);
Assert.assertEquals(1, tasks.size());
}

@Test
public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws IOException
{
KubernetesTaskRunner runner = new KubernetesTaskRunner(
taskAdapter,
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter
) {
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
return new KubernetesWorkItem(task, null).getResult();
}
};

Job job = new JobBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.build();

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException());

replayAll();

List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = runner.restore();

verifyAll();

Assert.assertNotNull(tasks);
Assert.assertEquals(0, tasks.size());
}

@Test
public void test_getTotalTaskSlotCount()
{
Expand Down