diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 89d33c7404b7..1088e10bf2b9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerUtils; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; @@ -48,6 +49,7 @@ import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; +import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.TaskLogStreamer; @@ -106,6 +108,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner private final HttpClient httpClient; private final PeonLifecycleFactory peonLifecycleFactory; private final ServiceEmitter emitter; + private final TaskStorage taskStorage; public KubernetesTaskRunner( @@ -114,7 +117,8 @@ public KubernetesTaskRunner( KubernetesPeonClient client, HttpClient httpClient, PeonLifecycleFactory peonLifecycleFactory, - ServiceEmitter emitter + ServiceEmitter emitter, + TaskStorage taskStorage ) { this.adapter = adapter; @@ -127,6 +131,7 @@ public KubernetesTaskRunner( Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d") ); this.emitter = emitter; + this.taskStorage = taskStorage; } @Override @@ -312,10 +317,17 @@ public Optional streamTaskReports(String taskid) throws IOException public List>> restore() { List>> restoredTasks = new ArrayList<>(); + Map tasksFromStorage = taskStorage.getActiveTasks().stream().collect(Collectors.toMap(Task::getId, task -> task)); for (Job job : client.getPeonJobs()) { try { - Task task = adapter.toTask(job); - restoredTasks.add(Pair.of(task, joinAsync(task))); + String taskId = adapter.getTaskId(job); + Task task = tasksFromStorage.get(taskId); + if (task == null) { + log.warn("Found K8s job running task id %s that was not in taskStorage during restore, deleting it.", taskId); + client.deletePeonJob(new K8sTaskId(taskId)); + continue; + } + restoredTasks.add(Pair.of(tasksFromStorage.get(taskId), joinAsync(task))); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java index 76698ba8fe31..14d6c7191f8a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java @@ -27,6 +27,7 @@ import org.apache.druid.guice.annotations.Smile; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.overlord.TaskRunnerFactory; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; @@ -57,6 +58,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory envVars = podSpec.getContainers().get(0).getEnv(); - Optional taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst(); - String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null); - if (contents == null) { - throw new IOException("No TASK_JSON environment variable found in pod: " + from.getMetadata().getName()); + Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); + if (annotations == null) { + throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); } - return mapper.readValue(Base64Compression.decompressBase64(contents), Task.class); + String taskId = annotations.get(DruidK8sConstants.TASK_ID); + if (taskId == null) { + throw new IOE("No task.id annotation found on pod spec for job [%s]", from.getMetadata().getName()); + } + return taskId; } @VisibleForTesting diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java index 661f5fb568b3..d7612051d1e2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java @@ -30,7 +30,6 @@ import io.fabric8.kubernetes.api.model.EnvVarBuilder; import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder; import io.fabric8.kubernetes.api.model.ObjectFieldSelector; -import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodTemplate; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; @@ -148,28 +147,18 @@ public Job fromTask(Task task) throws IOException .build(); } - /** - * Transform a {@link Pod} to a {@link Task} - * - * 1. Find task annotation on the pod - * 2. Base 64 decode and decompress task, read into {@link Task} - * - * @param from - * @return {@link Task} - * @throws IOException - */ @Override - public Task toTask(Job from) throws IOException + public String getTaskId(Job from) throws IOException { Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations(); if (annotations == null) { throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName()); } - String task = annotations.get(DruidK8sConstants.TASK); - if (task == null) { - throw new IOE("No task annotation found on pod spec for job [%s]", from.getMetadata().getName()); + String taskId = annotations.get(DruidK8sConstants.TASK_ID); + if (taskId == null) { + throw new IOE("No task.id annotation found on pod spec for job [%s]", from.getMetadata().getName()); } - return mapper.readValue(Base64Compression.decompressBase64(task), Task.class); + return taskId; } private HashMap initializePodTemplates(Properties properties) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java index 05933604f2ba..21cf1328b3db 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java @@ -29,6 +29,6 @@ public interface TaskAdapter Job fromTask(Task task) throws IOException; - Task toTask(Job from) throws IOException; + String getTaskId(Job from) throws IOException; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java index ba9d2accf170..634d284817bd 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java @@ -23,6 +23,7 @@ import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfigBuilder; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; @@ -54,6 +55,8 @@ public class KubernetesTaskRunnerFactoryTest private DruidKubernetesClient druidKubernetesClient; @Mock private ServiceEmitter emitter; + @Mock private TaskStorage taskStorage; + @Before public void setup() { @@ -90,7 +93,8 @@ public void test_get_returnsSameKuberentesTaskRunner_asBuild() taskConfig, properties, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner expectedRunner = factory.build(); @@ -112,7 +116,8 @@ public void test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSing taskConfig, properties, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); @@ -139,7 +144,8 @@ public void test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiCo taskConfig, properties, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); @@ -164,7 +170,8 @@ public void test_build_withSingleContainerAdapterType_returnsKubernetesTaskRunne taskConfig, props, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); @@ -194,7 +201,8 @@ public void test_build_withSingleContainerAdapterTypeAndSidecarSupport_throwsIAE taskConfig, props, druidKubernetesClient, - emitter + emitter, + taskStorage ); Assert.assertThrows( @@ -225,7 +233,8 @@ public void test_build_withMultiContainerAdapterType_returnsKubernetesTaskRunner taskConfig, props, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); @@ -250,7 +259,8 @@ public void test_build_withMultiContainerAdapterTypeAndSidecarSupport_returnsKub taskConfig, props, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); @@ -278,7 +288,8 @@ public void test_build_withPodTemplateAdapterType_returnsKubernetesTaskRunnerWit taskConfig, props, druidKubernetesClient, - emitter + emitter, + taskStorage ); KubernetesTaskRunner runner = factory.build(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 0359488802eb..c6fcce853493 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -32,12 +32,14 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler; +import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.easymock.EasyMock; @@ -76,6 +78,7 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Mock private KubernetesPeonClient peonClient; @Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle; @Mock private ServiceEmitter emitter; + @Mock private TaskStorage taskStorage; private KubernetesTaskRunnerConfig config; private KubernetesTaskRunner runner; @@ -96,7 +99,8 @@ public void setup() peonClient, httpClient, new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter + emitter, + taskStorage ); } @@ -294,7 +298,8 @@ public void test_restore_withExistingJobs() throws IOException peonClient, httpClient, new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter + emitter, + taskStorage ) { @Override protected ListenableFuture joinAsync(Task task) @@ -310,7 +315,8 @@ protected ListenableFuture joinAsync(Task task) .build(); EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); - EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(taskAdapter.getTaskId(job)).andReturn(ID); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(task)); replayAll(); @@ -331,7 +337,8 @@ public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws I peonClient, httpClient, new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter + emitter, + taskStorage ) { @Override protected ListenableFuture joinAsync(Task task) @@ -347,7 +354,48 @@ protected ListenableFuture joinAsync(Task task) .build(); EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); - EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException()); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()); + EasyMock.expect(taskAdapter.getTaskId(job)).andThrow(new IOException()); + + replayAll(); + + List>> tasks = runner.restore(); + + verifyAll(); + + Assert.assertNotNull(tasks); + Assert.assertEquals(0, tasks.size()); + } + + @Test + public void test_restore_missingFromTaskStorage_cleanupJob() throws IOException + { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter, + taskStorage + ) { + @Override + protected ListenableFuture joinAsync(Task task) + { + return new KubernetesWorkItem(task, null).getResult(); + } + }; + + Job job = new JobBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .build(); + + EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); + EasyMock.expect(taskAdapter.getTaskId(job)).andReturn(ID); + EasyMock.expect(peonClient.deletePeonJob(new K8sTaskId(ID))).andReturn(true); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()); replayAll(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java index 327e4276d1cd..721914b5ee59 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java @@ -154,8 +154,8 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception thread.start(); // assert that the env variable is corret - Task taskFromEnvVar = adapter.toTask(job); - assertEquals(task, taskFromEnvVar); + String taskIdFromEnvVar = adapter.getTaskId(job); + assertEquals(task.getId(), taskIdFromEnvVar); // now copy the task.json file from the pod and make sure its the same as our task.json we expected Path downloadPath = Paths.get(tempDir.toAbsolutePath().toString(), "task.json"); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java index 519e7177cbe5..4573bdcfcf27 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java @@ -185,8 +185,8 @@ public void serializingAndDeserializingATask() throws IOException assertEquals(2400000000L, Long.valueOf(amount)); assertTrue(StringUtils.isBlank(containerMemory.getFormat())); // no units specified we talk in bytes - Task taskFromJob = adapter.toTask(Iterables.getOnlyElement(jobList.getItems())); - assertEquals(task, taskFromJob); + String taskId = adapter.getTaskId(Iterables.getOnlyElement(jobList.getItems())); + assertEquals(task.getId(), taskId); } @Test diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java index 7f84c69caef4..ba60c520736d 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java @@ -242,7 +242,7 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws IOExce } @Test - public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException + public void test_getTaskId_withoutAnnotations_throwsIOE() throws IOException { Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(templatePath.toFile(), podTemplateSpec); @@ -261,11 +261,11 @@ public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class); - Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); + Assert.assertThrows(IOE.class, () -> adapter.getTaskId(job)); } @Test - public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException + public void test_getTaskId_withoutTaskAnnotation_throwsIOE() throws IOException { Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); mapper.writeValue(templatePath.toFile(), podTemplateSpec); @@ -292,31 +292,7 @@ public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException .endTemplate() .endSpec() .build(); - Assert.assertThrows(IOE.class, () -> adapter.toTask(job)); - } - - @Test - public void test_fromTask() throws IOException - { - Path templatePath = Files.createFile(tempDir.resolve("base.yaml")); - mapper.writeValue(templatePath.toFile(), podTemplateSpec); - - Properties props = new Properties(); - props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString()); - - PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter( - taskRunnerConfig, - taskConfig, - node, - mapper, - props - ); - - Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class); - Task actual = adapter.toTask(job); - Task expected = NoopTask.create("id", 1); - - Assertions.assertEquals(expected, actual); + Assert.assertThrows(IOE.class, () -> adapter.getTaskId(job)); } @Test