Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate task lifecycle from kubernetes/location lifecycle #15133

Merged
merged 12 commits into from
Oct 17, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,4 @@ public void updateStatus(Task task, TaskStatus status)
{
kubernetesTaskRunner.updateStatus(task, status);
}

@Override
public void updateLocation(Task task, TaskLocation location)
{
kubernetesTaskRunner.updateLocation(task, location);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,11 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
finally {
try {
saveLogs();
shutdown();
}
catch (Exception e) {
log.warn(e, "Cleanup failed for task [%s]", taskId);
log.warn(e, "Log processing failed for task [%s]", taskId);
}

shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can shutdown() throw an exception? since it is making a call to the kubernetes client. If so, the stopTask should probably be in it's own finally block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i decided to group saveLogs and shutdown together since they are both k8s lifecycle cleanup actions (it is okay if one fails), and then moved stopTask to a finally block b/c it has to happen

stopTask();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,4 @@ public void test_updateStatus()
runner.updateStatus(task, TaskStatus.running(ID));
verifyAll();
}

@Test
public void test_updateLocation()
{
kubernetesTaskRunner.updateLocation(task, TaskLocation.unknown());
replayAll();
runner.updateLocation(task, TaskLocation.unknown());
verifyAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,11 @@ public void test_registerListener_runningTask()
emitter
);

KubernetesPeonLifecycle kubernetesPeonLifecycle = EasyMock.mock(KubernetesPeonLifecycle.class);
EasyMock.expect(kubernetesPeonLifecycle.getState()).andReturn(KubernetesPeonLifecycle.State.RUNNING);
EasyMock.expect(kubernetesPeonLifecycle.getTaskLocation()).andReturn(TaskLocation.unknown());
KubernetesPeonLifecycle runningKubernetesPeonLifecycle = EasyMock.mock(KubernetesPeonLifecycle.class);
EasyMock.expect(runningKubernetesPeonLifecycle.getState()).andReturn(KubernetesPeonLifecycle.State.RUNNING);
EasyMock.expect(runningKubernetesPeonLifecycle.getTaskLocation()).andReturn(TaskLocation.unknown());
KubernetesWorkItem workItem = new KubernetesWorkItem(task);
workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle);
workItem.setKubernetesPeonLifecycle(runningKubernetesPeonLifecycle);
runner.tasks.put(task.getId(), workItem);

Executor executor = EasyMock.mock(Executor.class);
Expand All @@ -349,10 +349,10 @@ public void test_registerListener_runningTask()
EasyMock.expectLastCall();

replayAll();
EasyMock.replay(kubernetesPeonLifecycle);
EasyMock.replay(runningKubernetesPeonLifecycle);
runner.registerListener(taskRunnerListener, executor);
verifyAll();
EasyMock.verify(kubernetesPeonLifecycle);
EasyMock.verify(runningKubernetesPeonLifecycle);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;

@Deprecated
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
public class UpdateLocationAction implements TaskAction<Void>
{
@JsonIgnore
Expand Down Expand Up @@ -58,10 +57,6 @@ public TypeReference<Void> getReturnTypeReference()
@Override
public Void perform(Task task, TaskActionToolbox toolbox)
{
Optional<TaskRunner> taskRunner = toolbox.getTaskRunner();
if (taskRunner.isPresent()) {
taskRunner.get().updateLocation(task, taskLocation);
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,6 @@ default void updateStatus(Task task, TaskStatus status)
// do nothing
}

default void updateLocation(Task task, TaskLocation location)
{
// do nothing
}

/**
* The maximum number of tasks this TaskRunner can run concurrently.
* Can return -1 if this method is not implemented or capacity can't be found.
Expand Down

This file was deleted.

Loading