-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restore tasks when lifecycle start #14909
Merged
Merged
Changes from 2 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
d452c93
K8s tasks restore should be from lifecycle start
YongGang e82213c
add test
YongGang ba00f9d
add more tests
YongGang 25bec0a
fix test
YongGang 2d4a1a8
wait tasks restore finish when start
YongGang 9414030
fix style
YongGang a360c22
revert previous change and add comment
YongGang File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,6 @@ | |
import org.apache.druid.indexing.common.task.NoopTask; | ||
import org.apache.druid.indexing.common.task.Task; | ||
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; | ||
import org.apache.druid.java.util.common.Pair; | ||
import org.apache.druid.java.util.emitter.service.ServiceEmitter; | ||
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; | ||
import org.apache.druid.java.util.http.client.HttpClient; | ||
|
@@ -56,7 +55,6 @@ | |
import java.nio.charset.StandardCharsets; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.Executor; | ||
|
@@ -101,6 +99,29 @@ public void setup() | |
); | ||
} | ||
|
||
@Test | ||
public void test_start_withExistingJobs() throws IOException | ||
{ | ||
Job job = new JobBuilder() | ||
.withNewMetadata() | ||
.withName(ID) | ||
.endMetadata() | ||
.build(); | ||
|
||
EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); | ||
EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); | ||
EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andReturn(TaskStatus.running(task.getId())); | ||
|
||
replayAll(); | ||
|
||
runner.start(); | ||
|
||
verifyAll(); | ||
|
||
Assert.assertNotNull(runner.tasks); | ||
Assert.assertEquals(1, runner.tasks.size()); | ||
} | ||
|
||
@Test | ||
public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional() | ||
{ | ||
|
@@ -263,80 +284,6 @@ public void test_shutdown_withoutExistingTask() | |
runner.shutdown(task.getId(), ""); | ||
} | ||
|
||
@Test | ||
public void test_restore_withExistingJobs() throws IOException | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to delete all these tests again? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added the missing one back. |
||
{ | ||
KubernetesTaskRunner runner = new KubernetesTaskRunner( | ||
taskAdapter, | ||
config, | ||
peonClient, | ||
httpClient, | ||
new TestPeonLifecycleFactory(kubernetesPeonLifecycle), | ||
emitter | ||
) { | ||
@Override | ||
protected ListenableFuture<TaskStatus> joinAsync(Task task) | ||
{ | ||
return new KubernetesWorkItem(task, null).getResult(); | ||
} | ||
}; | ||
|
||
Job job = new JobBuilder() | ||
.withNewMetadata() | ||
.withName(ID) | ||
.endMetadata() | ||
.build(); | ||
|
||
EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); | ||
EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); | ||
|
||
replayAll(); | ||
|
||
List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = runner.restore(); | ||
|
||
verifyAll(); | ||
|
||
Assert.assertNotNull(tasks); | ||
Assert.assertEquals(1, tasks.size()); | ||
} | ||
|
||
@Test | ||
public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws IOException | ||
{ | ||
KubernetesTaskRunner runner = new KubernetesTaskRunner( | ||
taskAdapter, | ||
config, | ||
peonClient, | ||
httpClient, | ||
new TestPeonLifecycleFactory(kubernetesPeonLifecycle), | ||
emitter | ||
) { | ||
@Override | ||
protected ListenableFuture<TaskStatus> joinAsync(Task task) | ||
{ | ||
return new KubernetesWorkItem(task, null).getResult(); | ||
} | ||
}; | ||
|
||
Job job = new JobBuilder() | ||
.withNewMetadata() | ||
.withName(ID) | ||
.endMetadata() | ||
.build(); | ||
|
||
EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); | ||
EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException()); | ||
|
||
replayAll(); | ||
|
||
List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = runner.restore(); | ||
|
||
verifyAll(); | ||
|
||
Assert.assertNotNull(tasks); | ||
Assert.assertEquals(0, tasks.size()); | ||
} | ||
|
||
@Test | ||
public void test_getTotalTaskSlotCount() | ||
{ | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this really fix the bug? It looks like after this change, once
start()
completes we can be sure thattasks
has all the right task IDs in it, but we can't be sure that it has the most up-to-date statuses. The statuses are still being restored asynchronously byjoinAsync
, and that could be happening in the background afterstart()
exists.Could you please consider this, and determine if it's OK or not? If it's OK please add a comment here explaining why, so future readers don't need to wonder if the async restoration is OK. If it's not OK, then please update the code to wait for the most up to date statuses to be loaded before returning from
start()
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the behavior we saw was this change made task failures during rollover less common (probably because there's still a race condition here). i think we should look at how the HttpRemoteTaskRunner solves this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gianm
I tried to add some code in the start logic of KubernetesTaskRunner to check the tasks that have completed before the overlord came up and wait on their futures to complete, but this didn't actually solve the problem.
I think this is because the callbacks that are responsible for listening to the taskRunner future and updating the task's status in TaskStorage are added by the TaskQueue manageInternalCritical block, which doesn't get run during LIfeycleStart.
I think this may actually be a general race condition-y thing with supervisors that mm-less ingestion may be running into more often. Will need more time to figure out what's happening but for now I think this PR can be merged because the logic of list all jobs in k8s -> add them to the tasks map definitely belongs in the start method and not the restore method. (the restore method appears to be called in every manage() run in TaskQueue).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As George mentioned, this change has reduced the frequency of task failures during rollovers. To comprehensively address the issue, we might consider persisting the status of
SeekableStreamIndexTaskRunner
in the database. This would allow for accurate restoration upon startup, so don't need to rely on TaskRunner for the latest task statuses when start. However, this enhancement will be tackled in a subsequent PR.(we've also observed similar symptoms with Middle Manager streaming ingestion)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think this change fixes the issue described in the PR description (task failures during rollovers). it still seems like a good idea because restore() gets called a lot by TaskQueue.
@YongGang i might change the description of the PR since this isn't targeted at fixing the bugs you were seeing anymore.