From 0b0559b94b3948d0e2e8613e77803e55db97681e Mon Sep 17 00:00:00 2001 From: George Wu Date: Thu, 10 Aug 2023 10:22:09 -0400 Subject: [PATCH 1/3] Deprecate toTask --- .../k8s/overlord/KubernetesTaskRunner.java | 20 +++++++++++--- .../overlord/KubernetesTaskRunnerFactory.java | 9 +++++-- .../overlord/taskadapter/K8sTaskAdapter.java | 14 ++++++++++ .../taskadapter/PodTemplateTaskAdapter.java | 13 +++++++++ .../k8s/overlord/taskadapter/TaskAdapter.java | 2 ++ .../KubernetesTaskRunnerFactoryTest.java | 27 +++++++++++++------ .../overlord/KubernetesTaskRunnerTest.java | 11 +++++--- 7 files changed, 80 insertions(+), 16 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 89d33c7404b7..0eeeca7c0e27 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerUtils; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; @@ -61,6 +62,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -106,6 +108,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner private final HttpClient httpClient; private final PeonLifecycleFactory peonLifecycleFactory; private final ServiceEmitter emitter; + private final TaskStorage taskStorage; public KubernetesTaskRunner( @@ -114,7 +117,8 @@ public KubernetesTaskRunner( KubernetesPeonClient client, HttpClient httpClient, PeonLifecycleFactory peonLifecycleFactory, - ServiceEmitter emitter + ServiceEmitter emitter, + TaskStorage taskStorage ) { this.adapter = adapter; @@ -127,6 +131,7 @@ public KubernetesTaskRunner( Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d") ); this.emitter = emitter; + this.taskStorage = taskStorage; } @Override @@ -312,10 +317,19 @@ public Optional streamTaskReports(String taskid) throws IOException public List>> restore() { List>> restoredTasks = new ArrayList<>(); + Map tasksFromStorage = new HashMap<>(); + for (Task task: taskStorage.getActiveTasks()) { + tasksFromStorage.put(task.getId(), task); + } for (Job job : client.getPeonJobs()) { try { - Task task = adapter.toTask(job); - restoredTasks.add(Pair.of(task, joinAsync(task))); + String taskId = adapter.getTaskId(job); + Task task = tasksFromStorage.get(taskId); + if (task == null) { + log.warn("Found K8s job running task id %s that was not in taskStorage during restore, ignoring it.", taskId); + continue; + } + restoredTasks.add(Pair.of(tasksFromStorage.get(taskId), joinAsync(task))); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java index 76698ba8fe31..14d6c7191f8a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java @@ -27,6 +27,7 @@ import org.apache.druid.guice.annotations.Smile; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.overlord.TaskRunnerFactory; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; @@ -57,6 +58,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); + if (annotations == null) { + throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); + } + String taskId = annotations.get(DruidK8sConstants.TASK_ID); + if (taskId == null) { + throw new IOE("No task.id annotation found on pod spec for job [%s]", from.getMetadata().getName()); + } + return taskId; + } + @VisibleForTesting abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index 661f5fb568b3..c90f19e78f27 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -172,6 +172,19 @@ public Task toTask(Job from) throws IOException return mapper.readValue(Base64Compression.decompressBase64(task), Task.class); } + @Override + public String getTaskId(Job from) throws IOException { + Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); + if (annotations == null) { + throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); + } + String taskId = annotations.get(DruidK8sConstants.TASK_ID); + if (taskId == null) { + throw new IOE("No task.id annotation found on pod spec for job [%s]", from.getMetadata().getName()); + } + return taskId; + } + private HashMap initializePodTemplates(Properties properties) { HashMap podTemplateMap = new HashMap<>(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java index 05933604f2ba..51f8db80bd6e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java @@ -31,4 +31,6 @@ public interface TaskAdapter Task toTask(Job from) throws IOException; + String getTaskId(Job from) throws IOException; + } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java index ba9d2accf170..634d284817bd 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java @@ -23,6 +23,7 @@ import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfigBuilder; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; @@ -54,6 +55,8 @@ public class KubernetesTaskRunnerFactoryTest private DruidKubernetesClient druidKubernetesClient; @Mock private ServiceEmitter emitter; + @Mock private TaskStorage taskStorage; + @Before public void setup() { @@ -90,7 +93,8 @@ public void test_get_returnsSameKuberentesTaskRunner_asBuild() taskConfig, properties, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner expectedRunner = factory.build(); @@ -112,7 +116,8 @@ public void test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSing taskConfig, properties, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); @@ -139,7 +144,8 @@ public void test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiCo taskConfig, properties, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); @@ -164,7 +170,8 @@ public void test_build_withSingleContainerAdapterType_returnsKubernetesTaskRunne taskConfig, props, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); @@ -194,7 +201,8 @@ public void test_build_withSingleContainerAdapterTypeAndSidecarSupport_throwsIAE taskConfig, props, druidKubernetesClient, - emitter + emitter, + taskStorage ); Assert.assertThrows( @@ -225,7 +233,8 @@ public void test_build_withMultiContainerAdapterType_returnsKubernetesTaskRunner taskConfig, props, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); @@ -250,7 +259,8 @@ public void test_build_withMultiContainerAdapterTypeAndSidecarSupport_returnsKub taskConfig, props, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); @@ -278,7 +288,8 @@ public void test_build_withPodTemplateAdapterType_returnsKubernetesTaskRunnerWit taskConfig, props, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 0359488802eb..0dd1d0c9307a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; @@ -76,6 +77,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Mock private KubernetesPeonClient peonClient; @Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle; @Mock private ServiceEmitter emitter; + @Mock private TaskStorage taskStorage; private KubernetesTaskRunnerConfig config; private KubernetesTaskRunner runner; @@ -96,7 +98,8 @@ public void setup() peonClient, httpClient, new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter + emitter, + taskStorage ); } @@ -294,7 +297,8 @@ public void test_restore_withExistingJobs() throws IOException peonClient, httpClient, new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter + emitter, + taskStorage ) { @Override protected ListenableFuture joinAsync(Task task) @@ -331,7 +335,8 @@ public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws I peonClient, httpClient, new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter + emitter, + taskStorage ) { @Override protected ListenableFuture joinAsync(Task task) From ea6aef02e849ff0ccfbd84cf8c479cf70999575b Mon Sep 17 00:00:00 2001 From: George Wu Date: Thu, 10 Aug 2023 13:23:19 -0400 Subject: [PATCH 2/3] Finish deprecating toTask --- .../k8s/overlord/KubernetesTaskRunner.java | 6 +--- .../overlord/taskadapter/K8sTaskAdapter.java | 17 +--------- .../taskadapter/PodTemplateTaskAdapter.java | 26 +-------------- .../k8s/overlord/taskadapter/TaskAdapter.java | 2 -- .../overlord/KubernetesTaskRunnerTest.java | 6 ++-- .../DruidPeonClientIntegrationTest.java | 4 +-- .../taskadapter/K8sTaskAdapterTest.java | 4 +-- .../PodTemplateTaskAdapterTest.java | 32 +++---------------- 8 files changed, 15 insertions(+), 82 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 0eeeca7c0e27..1a8aef11e6c2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -62,7 +62,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -317,10 +316,7 @@ public Optional streamTaskReports(String taskid) throws IOException public List>> restore() { List>> restoredTasks = new ArrayList<>(); - Map tasksFromStorage = new HashMap<>(); - for (Task task: taskStorage.getActiveTasks()) { - tasksFromStorage.put(task.getId(), task); - } + Map tasksFromStorage = taskStorage.getActiveTasks().stream().collect(Collectors.toMap(Task::getId, task -> task)); for (Job job : client.getPeonJobs()) { try { String taskId = adapter.getTaskId(job); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java index 38ed5c2a1b6a..67016720f2de 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java @@ -28,7 +28,6 @@ import com.google.common.collect.Lists; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerPort; -import io.fabric8.kubernetes.api.model.EnvVar; import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder; import io.fabric8.kubernetes.api.model.ObjectFieldSelector; @@ -51,7 +50,6 @@ import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; -import org.apache.druid.k8s.overlord.common.Base64Compression; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesClientApi; @@ -125,21 +123,8 @@ public Job fromTask(Task task) throws IOException } @Override - public Task toTask(Job from) throws IOException + public String getTaskId(Job from) throws IOException { - PodSpec podSpec = from.getSpec().getTemplate().getSpec(); - massageSpec(podSpec, "main"); - List envVars = podSpec.getContainers().get(0).getEnv(); - Optional taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst(); - String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null); - if (contents == null) { - throw new IOException("No TASK_JSON environment variable found in pod: " + from.getMetadata().getName()); - } - return mapper.readValue(Base64Compression.decompressBase64(contents), Task.class); - } - - @Override - public String getTaskId(Job from) throws IOException { Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); if (annotations == null) { throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index c90f19e78f27..d7612051d1e2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -30,7 +30,6 @@ import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder; import io.fabric8.kubernetes.api.model.ObjectFieldSelector; -import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodTemplate; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; @@ -148,32 +147,9 @@ public Job fromTask(Task task) throws IOException .build(); } - /** - * Transform a {@link Pod} to a {@link Task} - * - * 1. Find task annotation on the pod - * 2. Base 64 decode and decompress task, read into {@link Task} - * - * @param from - * @return {@link Task} - * @throws IOException - */ @Override - public Task toTask(Job from) throws IOException + public String getTaskId(Job from) throws IOException { - Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); - if (annotations == null) { - throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); - } - String task = annotations.get(DruidK8sConstants.TASK); - if (task == null) { - throw new IOE("No task annotation found on pod spec for job [%s]", from.getMetadata().getName()); - } - return mapper.readValue(Base64Compression.decompressBase64(task), Task.class); - } - - @Override - public String getTaskId(Job from) throws IOException { Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); if (annotations == null) { throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java index 51f8db80bd6e..21cf1328b3db 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java @@ -29,8 +29,6 @@ public interface TaskAdapter Job fromTask(Task task) throws IOException; - Task toTask(Job from) throws IOException; - String getTaskId(Job from) throws IOException; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 0dd1d0c9307a..a7c8bbc8a33e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -314,7 +314,8 @@ protected ListenableFuture joinAsync(Task task) .build(); EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); - EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(taskAdapter.getTaskId(job)).andReturn(ID); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(task)); replayAll(); @@ -352,7 +353,8 @@ protected ListenableFuture joinAsync(Task task) .build(); EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); - EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException()); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()); + EasyMock.expect(taskAdapter.getTaskId(job)).andThrow(new IOException()); replayAll(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java index 327e4276d1cd..721914b5ee59 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java @@ -154,8 +154,8 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception thread.start(); // assert that the env variable is corret - Task taskFromEnvVar = adapter.toTask(job); - assertEquals(task, taskFromEnvVar); + String taskIdFromEnvVar = adapter.getTaskId(job); + assertEquals(task.getId(), taskIdFromEnvVar); // now copy the task.json file from the pod and make sure its the same as our task.json we expected Path downloadPath = Paths.get(tempDir.toAbsolutePath().toString(), "task.json"); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java index 519e7177cbe5..4573bdcfcf27 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java @@ -185,8 +185,8 @@ public void serializingAndDeserializingATask() throws IOException assertEquals(2400000000L, Long.valueOf(amount)); assertTrue(StringUtils.isBlank(containerMemory.getFormat())); // no units specified we talk in bytes - Task taskFromJob = adapter.toTask(Iterables.getOnlyElement(jobList.getItems())); - assertEquals(task, taskFromJob); + String taskId = adapter.getTaskId(Iterables.getOnlyElement(jobList.getItems())); + assertEquals(task.getId(), taskId); } @Test diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java index 7f84c69caef4..ba60c520736d 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java @@ -242,7 +242,7 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws IOExce } @Test - public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException + public void test_getTaskId_withoutAnnotations_throwsIOE() throws IOException { Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(templatePath.toFile(), podTemplateSpec); @@ -261,11 +261,11 @@ public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class); - Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); + Assert.assertThrows(IOE.class, () -> adapter.getTaskId(job)); } @Test - public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException + public void test_getTaskId_withoutTaskAnnotation_throwsIOE() throws IOException { Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(templatePath.toFile(), podTemplateSpec); @@ -292,31 +292,7 @@ public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException .endTemplate() .endSpec() .build(); - Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); - } - - @Test - public void test_fromTask() throws IOException - { - Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); - mapper.writeValue(templatePath.toFile(), podTemplateSpec); - - Properties props = new Properties(); - props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); - - PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( - taskRunnerConfig, - taskConfig, - node, - mapper, - props - ); - - Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class); - Task actual = adapter.toTask(job); - Task expected = NoopTask.create("id", 1); - - Assertions.assertEquals(expected, actual); + Assert.assertThrows(IOE.class, () -> adapter.getTaskId(job)); } @Test From a966ebff2c63b83d0896ca123191305ec70f18c0 Mon Sep 17 00:00:00 2001 From: George Wu Date: Fri, 11 Aug 2023 16:47:34 -0400 Subject: [PATCH 3/3] delete jobs --- .../k8s/overlord/KubernetesTaskRunner.java | 4 +- .../overlord/KubernetesTaskRunnerTest.java | 41 +++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 1a8aef11e6c2..1088e10bf2b9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -49,6 +49,7 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; +import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.TaskLogStreamer; @@ -322,7 +323,8 @@ public List>> restore() String taskId = adapter.getTaskId(job); Task task = tasksFromStorage.get(taskId); if (task == null) { - log.warn("Found K8s job running task id %s that was not in taskStorage during restore, ignoring it.", taskId); + log.warn("Found K8s job running task id %s that was not in taskStorage during restore, deleting it.", taskId); + client.deletePeonJob(new K8sTaskId(taskId)); continue; } restoredTasks.add(Pair.of(tasksFromStorage.get(taskId), joinAsync(task))); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index a7c8bbc8a33e..c6fcce853493 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -39,6 +39,7 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; +import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.easymock.EasyMock; @@ -366,6 +367,46 @@ protected ListenableFuture joinAsync(Task task) Assert.assertEquals(0, tasks.size()); } + @Test + public void test_restore_missingFromTaskStorage_cleanupJob() throws IOException + { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter, + taskStorage + ) { + @Override + protected ListenableFuture joinAsync(Task task) + { + return new KubernetesWorkItem(task, null).getResult(); + } + }; + + Job job = new JobBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .build(); + + EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); + EasyMock.expect(taskAdapter.getTaskId(job)).andReturn(ID); + EasyMock.expect(peonClient.deletePeonJob(new K8sTaskId(ID))).andReturn(true); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()); + + replayAll(); + + List>> tasks = runner.restore(); + + verifyAll(); + + Assert.assertNotNull(tasks); + Assert.assertEquals(0, tasks.size()); + } + @Test public void test_getTotalTaskSlotCount() {