From d452c9375ab659d9f66ab7547187cb80340e1027 Mon Sep 17 00:00:00 2001 From: YongGang Date: Thu, 24 Aug 2023 10:19:19 -0700 Subject: [PATCH 1/7] K8s tasks restore should be from lifecycle start --- .../k8s/overlord/KubernetesTaskRunner.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 24e21b0b4e0e..ce811dbc9345 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -309,23 +310,22 @@ public Optional streamTaskReports(String taskid) throws IOException @Override public List>> restore() { - List>> restoredTasks = new ArrayList<>(); + return ImmutableList.of(); + } + + @Override + @LifecycleStart + public void start() + { for (Job job : client.getPeonJobs()) { try { - Task task = adapter.toTask(job); - restoredTasks.add(Pair.of(task, joinAsync(task))); + joinAsync(adapter.toTask(job)); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); } } - return restoredTasks; - } - @Override - @LifecycleStart - public void start() - { cleanupExecutor.scheduleAtFixedRate( () -> client.deleteCompletedPeonJobsOlderThan( @@ -339,7 +339,6 @@ public void start() log.debug("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay()); } - @Override @LifecycleStop public void stop() From e82213c2154fef02f28e7b94e441b7e0e2d0c0a9 Mon Sep 17 00:00:00 2001 From: YongGang Date: Fri, 25 Aug 2023 11:51:19 -0700 Subject: [PATCH 2/7] add test --- .../k8s/overlord/KubernetesTaskRunner.java | 2 +- .../overlord/KubernetesTaskRunnerTest.java | 99 +++++-------------- 2 files changed, 24 insertions(+), 77 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index ce811dbc9345..2c6ddb553441 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -61,7 +61,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.URL; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -325,6 +324,7 @@ public void start() log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); } } + log.info("Loaded %,d tasks from previous run", tasks.size()); cleanupExecutor.scheduleAtFixedRate( () -> diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index bee3a533c7b8..215994a722ad 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -32,7 +32,6 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.java.util.http.client.HttpClient; @@ -56,7 +55,6 @@ import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -101,6 +99,29 @@ public void setup() ); } + @Test + public void test_start_withExistingJobs() throws IOException + { + Job job = new JobBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .build(); + + EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); + EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andReturn(TaskStatus.running(task.getId())); + + replayAll(); + + runner.start(); + + verifyAll(); + + Assert.assertNotNull(runner.tasks); + Assert.assertEquals(1, runner.tasks.size()); + } + @Test public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional() { @@ -263,80 +284,6 @@ public void test_shutdown_withoutExistingTask() runner.shutdown(task.getId(), ""); } - @Test - public void test_restore_withExistingJobs() throws IOException - { - KubernetesTaskRunner runner = new KubernetesTaskRunner( - taskAdapter, - config, - peonClient, - httpClient, - new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter - ) { - @Override - protected ListenableFuture joinAsync(Task task) - { - return new KubernetesWorkItem(task, null).getResult(); - } - }; - - Job job = new JobBuilder() - .withNewMetadata() - .withName(ID) - .endMetadata() - .build(); - - EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); - EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); - - replayAll(); - - List>> tasks = runner.restore(); - - verifyAll(); - - Assert.assertNotNull(tasks); - Assert.assertEquals(1, tasks.size()); - } - - @Test - public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws IOException - { - KubernetesTaskRunner runner = new KubernetesTaskRunner( - taskAdapter, - config, - peonClient, - httpClient, - new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter - ) { - @Override - protected ListenableFuture joinAsync(Task task) - { - return new KubernetesWorkItem(task, null).getResult(); - } - }; - - Job job = new JobBuilder() - .withNewMetadata() - .withName(ID) - .endMetadata() - .build(); - - EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); - EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException()); - - replayAll(); - - List>> tasks = runner.restore(); - - verifyAll(); - - Assert.assertNotNull(tasks); - Assert.assertEquals(0, tasks.size()); - } - @Test public void test_getTotalTaskSlotCount() { From ba00f9dac5be8f50b6246724b7cbbeaeabc71bb7 Mon Sep 17 00:00:00 2001 From: YongGang Date: Sun, 27 Aug 2023 14:25:31 -0700 Subject: [PATCH 3/7] add more tests --- .../overlord/KubernetesTaskRunnerTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 215994a722ad..182c9da420f7 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -122,6 +122,28 @@ public void test_start_withExistingJobs() throws IOException Assert.assertEquals(1, runner.tasks.size()); } + @Test + public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOException + { + Job job = new JobBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .build(); + + EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); + EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException()); + + replayAll(); + + runner.start(); + + verifyAll(); + + Assert.assertNotNull(runner.tasks); + Assert.assertEquals(0, runner.tasks.size()); + } + @Test public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional() { From 25bec0a918eb29f15c218dfc9afc6c01eae6b627 Mon Sep 17 00:00:00 2001 From: YongGang Date: Sun, 27 Aug 2023 20:41:49 -0700 Subject: [PATCH 4/7] fix test --- .../overlord/KubernetesTaskRunnerTest.java | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 182c9da420f7..cc42655803dc 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -102,6 +102,23 @@ public void setup() @Test public void test_start_withExistingJobs() throws IOException { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ) + { + @Override + protected ListenableFuture joinAsync(Task task) + { + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)) + .getResult(); + } + }; + Job job = new JobBuilder() .withNewMetadata() .withName(ID) @@ -110,7 +127,6 @@ public void test_start_withExistingJobs() throws IOException EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); - EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andReturn(TaskStatus.running(task.getId())); replayAll(); @@ -125,6 +141,23 @@ public void test_start_withExistingJobs() throws IOException @Test public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOException { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ) + { + @Override + protected ListenableFuture joinAsync(Task task) + { + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)) + .getResult(); + } + }; + Job job = new JobBuilder() .withNewMetadata() .withName(ID) From 2d4a1a8f06a1b2371fb8a578f2f712f2f23073d7 Mon Sep 17 00:00:00 2001 From: YongGang Date: Tue, 29 Aug 2023 10:35:44 -0700 Subject: [PATCH 5/7] wait tasks restore finish when start --- .../druid/k8s/overlord/KubernetesTaskRunner.java | 12 +++++++++++- .../druid/k8s/overlord/KubernetesTaskRunnerTest.java | 5 ++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 2c6ddb553441..0c643fd5a3a2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -25,6 +25,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -61,6 +62,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -316,14 +318,22 @@ public List>> restore() @LifecycleStart public void start() { + List> tasksStatus = new ArrayList<>(); for (Job job : client.getPeonJobs()) { try { - joinAsync(adapter.toTask(job)); + tasksStatus.add(joinAsync(adapter.toTask(job))); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); } } + try { + Futures.allAsList(tasksStatus).get(); + } + catch (InterruptedException | ExecutionException e) { + // log the exception and proceed with startup + log.error(e, e.getMessage()); + } log.info("Loaded %,d tasks from previous run", tasks.size()); cleanupExecutor.scheduleAtFixedRate( diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index cc42655803dc..74bdc9f5a8e1 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -114,7 +114,10 @@ public void test_start_withExistingJobs() throws IOException @Override protected ListenableFuture joinAsync(Task task) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)) + return tasks.computeIfAbsent( + task.getId(), + k -> new KubernetesWorkItem(task, Futures.immediateFuture(TaskStatus.success(task.getId()))) + ) .getResult(); } }; From 9414030e5a9b9cf984fe155e10f39b294df575fc Mon Sep 17 00:00:00 2001 From: YongGang Date: Tue, 29 Aug 2023 11:32:33 -0700 Subject: [PATCH 6/7] fix style --- .../druid/k8s/overlord/KubernetesTaskRunnerTest.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 74bdc9f5a8e1..b54a820c55dd 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -115,10 +115,12 @@ public void test_start_withExistingJobs() throws IOException protected ListenableFuture joinAsync(Task task) { return tasks.computeIfAbsent( - task.getId(), - k -> new KubernetesWorkItem(task, Futures.immediateFuture(TaskStatus.success(task.getId()))) - ) - .getResult(); + task.getId(), + k -> new KubernetesWorkItem( + task, + Futures.immediateFuture(TaskStatus.success(task.getId())) + ) + ).getResult(); } }; From a360c2287517dd9543ae7832b8a0df09baa0a179 Mon Sep 17 00:00:00 2001 From: YongGang Date: Thu, 14 Sep 2023 17:52:59 -0700 Subject: [PATCH 7/7] revert previous change and add comment --- .../druid/k8s/overlord/KubernetesTaskRunner.java | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 0c643fd5a3a2..1272ceacb684 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -25,7 +25,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -62,7 +61,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.URL; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -318,22 +316,16 @@ public List>> restore() @LifecycleStart public void start() { - List> tasksStatus = new ArrayList<>(); + log.info("Starting K8sTaskRunner..."); + // Load tasks from previously running jobs and wait for their statuses to be updated asynchronously. for (Job job : client.getPeonJobs()) { try { - tasksStatus.add(joinAsync(adapter.toTask(job))); + joinAsync(adapter.toTask(job)); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); } } - try { - Futures.allAsList(tasksStatus).get(); - } - catch (InterruptedException | ExecutionException e) { - // log the exception and proceed with startup - log.error(e, e.getMessage()); - } log.info("Loaded %,d tasks from previous run", tasks.size()); cleanupExecutor.scheduleAtFixedRate(