-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restore tasks when lifecycle start #14909
Conversation
@@ -263,80 +284,6 @@ public void test_shutdown_withoutExistingTask() | |||
runner.shutdown(task.getId(), ""); | |||
} | |||
|
|||
@Test | |||
public void test_restore_withExistingJobs() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to delete all these tests again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the missing one back.
for (Job job : client.getPeonJobs()) { | ||
try { | ||
Task task = adapter.toTask(job); | ||
restoredTasks.add(Pair.of(task, joinAsync(task))); | ||
joinAsync(adapter.toTask(job)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this really fix the bug? It looks like after this change, once start()
completes we can be sure that tasks
has all the right task IDs in it, but we can't be sure that it has the most up-to-date statuses. The statuses are still being restored asynchronously by joinAsync
, and that could be happening in the background after start()
exists.
Could you please consider this, and determine if it's OK or not? If it's OK please add a comment here explaining why, so future readers don't need to wonder if the async restoration is OK. If it's not OK, then please update the code to wait for the most up to date statuses to be loaded before returning from start()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the behavior we saw was this change made task failures during rollover less common (probably because there's still a race condition here). i think we should look at how the HttpRemoteTaskRunner solves this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to add some code in the start logic of KubernetesTaskRunner to check the tasks that have completed before the overlord came up and wait on their futures to complete, but this didn't actually solve the problem.
I think this is because the callbacks that are responsible for listening to the taskRunner future and updating the task's status in TaskStorage are added by the TaskQueue manageInternalCritical block, which doesn't get run during LIfeycleStart.
I think this may actually be a general race condition-y thing with supervisors that mm-less ingestion may be running into more often. Will need more time to figure out what's happening but for now I think this PR can be merged because the logic of list all jobs in k8s -> add them to the tasks map definitely belongs in the start method and not the restore method. (the restore method appears to be called in every manage() run in TaskQueue).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As George mentioned, this change has reduced the frequency of task failures during rollovers. To comprehensively address the issue, we might consider persisting the status of SeekableStreamIndexTaskRunner
in the database. This would allow for accurate restoration upon startup, so don't need to rely on TaskRunner for the latest task statuses when start. However, this enhancement will be tackled in a subsequent PR.
(we've also observed similar symptoms with Middle Manager streaming ingestion)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think this change fixes the issue described in the PR description (task failures during rollovers). it still seems like a good idea because restore() gets called a lot by TaskQueue.
@YongGang i might change the description of the PR since this isn't targeted at fixing the bugs you were seeing anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Having the TaskRunner read the peon jobs from the k8s client in the start method seems to be like the correct place to do this.
@gianm I'm going to merge this change since it is in a contrib extension and makes things more stable. We will address the comment about explaining why |
Description
Move the task restore logic to lifecycle start method in
KubernetesTaskRunner
, this is also align with what other remote task runners do.Release note
Change to restore tasks when lifecycle start
Key changed/added classes in this PR
KubernetesTaskRunner
move the tasks restoration logic from restore to start method.This PR has: