Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate task lifecycle from kubernetes/location lifecycle #15133

Merged
merged 12 commits into from
Oct 17, 2023

Conversation

georgew5656
Copy link
Contributor

Description

When running higher volumes of ingestion on the KubernetesTaskRunner (especially streaming) there are some issues caused by the difference between the Kubernetes lifecycle (pod startup and completion) and the Druid Task lifecycle (when a peon JVM has spun up and is ready to serve requests and when it has shut down)

  • During streaming task startup, in AbstractTask.setup, the getChatHandlerProvider gets registered after the UpdateLocation action submission. This can cause issues if there is a lot of load on the overlord because the task will get stuck retrying these /action submissions even though its chat handler has not been registered and the supervisor can't actually communicate with the task yet.

  • Similarly, the UpdateLocation action during AbstractTask.cleanUp also frequently causes issues during streaming task cleanup when there is a lot of load on the overlord. The cleanUp method is called after the chat handler provider is deregistered, so when the task gets stuck doing cleanup, there is a risk of the supervisor trying to chat with the task while it is in the process of existing.

-On larger Kubernetes clusters, it can take a while for K8s to report that a pod has successfully exited, meaning there can be a significant lag between when a peon JVM exits and when the KubernetesTaskRunner can report a task as completed. In general this slows down how quickly tasks can be reported as successful and can also cause similar issues to the above UpdateLocation actions with streaming tasks.

There is a tradeoff between having the peon hit the /action endpoint on the overlord with UpdateStatusAction and UpdateLocationAction to give the K8s task runner a more accurate account of where the peon is in the task lifecycle vs the time/chance of failure that these requests add.

My overall approach was to let the KubernetesTaskRunner/KubernetesPeonLifeycle (stuff running on the overlord) handle the Kubernetes/TaskLocation lifecycle, but have the peon be directly responsible for the task lifecycle by using the UpdateStatusAction as a way to mark the task future as complete.

Following this approach, I made two significant changes

  • I removed all the UpdateLocationAction calls in the peon and let the k8s task runner handle managing the tasks location itself. Specifically, the k8s task runner now notifies listeners that a task's location has changed when the k8s pod gets its ip and when the k8s pod lifecycle completes.
  • I separated the k8s lifecycle logic from the druid task lifecycle logic. The exec service in KubernetesTaskRunner is still responsible for submitting a K8s job for a task, waiting for the pod to come up, and then deleting the pod when it completes, but run(Task) no longer returns this future as the status of the Task. Instead, run(Task) returns a settable future that gets completed either when the Kubernetes lifecycle completes or when the UpdateStatusAction is sent from the peon. This means that tasks will now complete when the peon finishes its cleanup and notifies the k8s task runner that it has completed.

I also made a few small cleanup changes

  • registerListener should call notifyStatus on the new listener for running tasks.
  • Removed the PeonPhase class since it wasn't serving any purpose.

Release note

  • Cleanup lifecycle management of tasks in mm-less task scheduling
Key changed/added classes in this PR
  • KubernetesTaskRunner
  • KubernetesPeonLifecycle
  • AbstractTask
  • KubernetesWorkItem

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> {
ListenableFuture<TaskStatus> unused = exec.submit(() -> runTask(task));

Check notice

Code scanning / CodeQL

Unread local variable Note

Variable 'ListenableFuture unused' is never read.
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> {
ListenableFuture<TaskStatus> unused = exec.submit(() -> joinTask(task));

Check notice

Code scanning / CodeQL

Unread local variable Note

Variable 'ListenableFuture unused' is never read.
@suneet-s
Copy link
Contributor

+1 the approach makes sense to me. Some small code suggestions.

but have the peon be directly responsible for the task lifecycle by using the UpdateStatusAction as a way to mark the task future as complete.

With this change, can you test a few scenarios where a peon is killed directly from k8s while it is still processing data:

  • peon running a query_worker task
  • peon running a query_controller task
  • peon for a supervisor task (either kafka or kinesis should be fine)
  • peon running a compact task
  • peon running an index_parallel task
  • peon running a compact / index_parallel subtask (like partial_dimension_distribution or something)

}

shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can shutdown() throw an exception? since it is making a call to the kubernetes client. If so, the stopTask should probably be in it's own finally block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i decided to group saveLogs and shutdown together since they are both k8s lifecycle cleanup actions (it is okay if one fails), and then moved stopTask to a finally block b/c it has to happen

@@ -172,10 +176,12 @@ private TaskStatus joinTask(Task task)
@VisibleForTesting
protected TaskStatus doTask(Task task, boolean run)
{
TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution never started");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit weird to initialize status with a failure one, don't think we need it.


for (Map.Entry<String, KubernetesWorkItem> entry : tasks.entrySet()) {
if (entry.getValue().isRunning()) {
TaskRunnerUtils.notifyLocationChanged(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@georgew5656
Copy link
Contributor Author

+1 the approach makes sense to me. Some small code suggestions.

but have the peon be directly responsible for the task lifecycle by using the UpdateStatusAction as a way to mark the task future as complete.

With this change, can you test a few scenarios where a peon is killed directly from k8s while it is still processing data:

  • peon running a query_worker task
  • peon running a query_controller task
  • peon for a supervisor task (either kafka or kinesis should be fine)
  • peon running a compact task
  • peon running an index_parallel task
  • peon running a compact / index_parallel subtask (like partial_dimension_distribution or something)

i tested these situations and confirmed in the batch/compact case the subtasks/parent task fails as expected and no segments are written. for streaming the supervisor starts a new task with the original offeset

Copy link
Contributor

@suneet-s suneet-s left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@georgew5656 georgew5656 merged commit dc0b163 into apache:master Oct 17, 2023
81 checks passed
@cryptoe
Copy link
Contributor

cryptoe commented Nov 8, 2023

With this patch task duration is being reported as -1 for successful tasks.
@georgew5656 Can you please confirm if you are seeing this issue ?

georgew5656 added a commit to georgew5656/druid that referenced this pull request Nov 8, 2023
CaseyPan pushed a commit to CaseyPan/druid that referenced this pull request Nov 17, 2023
)

* Separate k8s and druid task lifecycles

* Remove extra log lines

* Fix unit tests

* fix unit tests

* Fix unit tests

* notify listeners on task completion

* Fix unit test

* unused var

* PR changes

* Fix unit tests

* Fix checkstyle

* PR changes
CaseyPan pushed a commit to CaseyPan/druid that referenced this pull request Nov 17, 2023
@LakshSingla LakshSingla added this to the 29.0.0 milestone Jan 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants