From ea6aef02e849ff0ccfbd84cf8c479cf70999575b Mon Sep 17 00:00:00 2001 From: George Wu Date: Thu, 10 Aug 2023 13:23:19 -0400 Subject: [PATCH] Finish deprecating toTask --- .../k8s/overlord/KubernetesTaskRunner.java | 6 +--- .../overlord/taskadapter/K8sTaskAdapter.java | 17 +--------- .../taskadapter/PodTemplateTaskAdapter.java | 26 +-------------- .../k8s/overlord/taskadapter/TaskAdapter.java | 2 -- .../overlord/KubernetesTaskRunnerTest.java | 6 ++-- .../DruidPeonClientIntegrationTest.java | 4 +-- .../taskadapter/K8sTaskAdapterTest.java | 4 +-- .../PodTemplateTaskAdapterTest.java | 32 +++---------------- 8 files changed, 15 insertions(+), 82 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 0eeeca7c0e27..1a8aef11e6c2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -62,7 +62,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -317,10 +316,7 @@ public Optional streamTaskReports(String taskid) throws IOException public List>> restore() { List>> restoredTasks = new ArrayList<>(); - Map tasksFromStorage = new HashMap<>(); - for (Task task: taskStorage.getActiveTasks()) { - tasksFromStorage.put(task.getId(), task); - } + Map tasksFromStorage = taskStorage.getActiveTasks().stream().collect(Collectors.toMap(Task::getId, task -> task)); for (Job job : client.getPeonJobs()) { try { String taskId = adapter.getTaskId(job); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java index 38ed5c2a1b6a..67016720f2de 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java @@ -28,7 +28,6 @@ import com.google.common.collect.Lists; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerPort; -import io.fabric8.kubernetes.api.model.EnvVar; import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder; import io.fabric8.kubernetes.api.model.ObjectFieldSelector; @@ -51,7 +50,6 @@ import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; -import org.apache.druid.k8s.overlord.common.Base64Compression; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesClientApi; @@ -125,21 +123,8 @@ public Job fromTask(Task task) throws IOException } @Override - public Task toTask(Job from) throws IOException + public String getTaskId(Job from) throws IOException { - PodSpec podSpec = from.getSpec().getTemplate().getSpec(); - massageSpec(podSpec, "main"); - List envVars = podSpec.getContainers().get(0).getEnv(); - Optional taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst(); - String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null); - if (contents == null) { - throw new IOException("No TASK_JSON environment variable found in pod: " + from.getMetadata().getName()); - } - return mapper.readValue(Base64Compression.decompressBase64(contents), Task.class); - } - - @Override - public String getTaskId(Job from) throws IOException { Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); if (annotations == null) { throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index c90f19e78f27..d7612051d1e2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -30,7 +30,6 @@ import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder; import io.fabric8.kubernetes.api.model.ObjectFieldSelector; -import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodTemplate; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; @@ -148,32 +147,9 @@ public Job fromTask(Task task) throws IOException .build(); } - /** - * Transform a {@link Pod} to a {@link Task} - * - * 1. Find task annotation on the pod - * 2. Base 64 decode and decompress task, read into {@link Task} - * - * @param from - * @return {@link Task} - * @throws IOException - */ @Override - public Task toTask(Job from) throws IOException + public String getTaskId(Job from) throws IOException { - Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); - if (annotations == null) { - throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); - } - String task = annotations.get(DruidK8sConstants.TASK); - if (task == null) { - throw new IOE("No task annotation found on pod spec for job [%s]", from.getMetadata().getName()); - } - return mapper.readValue(Base64Compression.decompressBase64(task), Task.class); - } - - @Override - public String getTaskId(Job from) throws IOException { Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); if (annotations == null) { throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java index 51f8db80bd6e..21cf1328b3db 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java @@ -29,8 +29,6 @@ public interface TaskAdapter Job fromTask(Task task) throws IOException; - Task toTask(Job from) throws IOException; - String getTaskId(Job from) throws IOException; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 0dd1d0c9307a..a7c8bbc8a33e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -314,7 +314,8 @@ protected ListenableFuture joinAsync(Task task) .build(); EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); - EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(taskAdapter.getTaskId(job)).andReturn(ID); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(task)); replayAll(); @@ -352,7 +353,8 @@ protected ListenableFuture joinAsync(Task task) .build(); EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); - EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException()); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()); + EasyMock.expect(taskAdapter.getTaskId(job)).andThrow(new IOException()); replayAll(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java index 327e4276d1cd..721914b5ee59 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java @@ -154,8 +154,8 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception thread.start(); // assert that the env variable is corret - Task taskFromEnvVar = adapter.toTask(job); - assertEquals(task, taskFromEnvVar); + String taskIdFromEnvVar = adapter.getTaskId(job); + assertEquals(task.getId(), taskIdFromEnvVar); // now copy the task.json file from the pod and make sure its the same as our task.json we expected Path downloadPath = Paths.get(tempDir.toAbsolutePath().toString(), "task.json"); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java index 519e7177cbe5..4573bdcfcf27 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java @@ -185,8 +185,8 @@ public void serializingAndDeserializingATask() throws IOException assertEquals(2400000000L, Long.valueOf(amount)); assertTrue(StringUtils.isBlank(containerMemory.getFormat())); // no units specified we talk in bytes - Task taskFromJob = adapter.toTask(Iterables.getOnlyElement(jobList.getItems())); - assertEquals(task, taskFromJob); + String taskId = adapter.getTaskId(Iterables.getOnlyElement(jobList.getItems())); + assertEquals(task.getId(), taskId); } @Test diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java index 7f84c69caef4..ba60c520736d 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java @@ -242,7 +242,7 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws IOExce } @Test - public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException + public void test_getTaskId_withoutAnnotations_throwsIOE() throws IOException { Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(templatePath.toFile(), podTemplateSpec); @@ -261,11 +261,11 @@ public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class); - Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); + Assert.assertThrows(IOE.class, () -> adapter.getTaskId(job)); } @Test - public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException + public void test_getTaskId_withoutTaskAnnotation_throwsIOE() throws IOException { Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(templatePath.toFile(), podTemplateSpec); @@ -292,31 +292,7 @@ public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException .endTemplate() .endSpec() .build(); - Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); - } - - @Test - public void test_fromTask() throws IOException - { - Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); - mapper.writeValue(templatePath.toFile(), podTemplateSpec); - - Properties props = new Properties(); - props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); - - PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( - taskRunnerConfig, - taskConfig, - node, - mapper, - props - ); - - Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class); - Task actual = adapter.toTask(job); - Task expected = NoopTask.create("id", 1); - - Assertions.assertEquals(expected, actual); + Assert.assertThrows(IOE.class, () -> adapter.getTaskId(job)); } @Test