diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 1a8aef11e6c2..1088e10bf2b9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -49,6 +49,7 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; +import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.TaskLogStreamer; @@ -322,7 +323,8 @@ public List>> restore() String taskId = adapter.getTaskId(job); Task task = tasksFromStorage.get(taskId); if (task == null) { - log.warn("Found K8s job running task id %s that was not in taskStorage during restore, ignoring it.", taskId); + log.warn("Found K8s job running task id %s that was not in taskStorage during restore, deleting it.", taskId); + client.deletePeonJob(new K8sTaskId(taskId)); continue; } restoredTasks.add(Pair.of(tasksFromStorage.get(taskId), joinAsync(task))); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index a7c8bbc8a33e..c6fcce853493 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -39,6 +39,7 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; +import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.easymock.EasyMock; @@ -366,6 +367,46 @@ protected ListenableFuture joinAsync(Task task) Assert.assertEquals(0, tasks.size()); } + @Test + public void test_restore_missingFromTaskStorage_cleanupJob() throws IOException + { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter, + taskStorage + ) { + @Override + protected ListenableFuture joinAsync(Task task) + { + return new KubernetesWorkItem(task, null).getResult(); + } + }; + + Job job = new JobBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .build(); + + EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); + EasyMock.expect(taskAdapter.getTaskId(job)).andReturn(ID); + EasyMock.expect(peonClient.deletePeonJob(new K8sTaskId(ID))).andReturn(true); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()); + + replayAll(); + + List>> tasks = runner.restore(); + + verifyAll(); + + Assert.assertNotNull(tasks); + Assert.assertEquals(0, tasks.size()); + } + @Test public void test_getTotalTaskSlotCount() {