From 0a7db67de003e90dac341a099a7dd1f6706d0180 Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 9 Oct 2023 09:30:45 -0400 Subject: [PATCH 01/12] Separate k8s and druid task lifecycles --- .../k8s/overlord/KubernetesPeonLifecycle.java | 20 ++- .../KubernetesPeonLifecycleFactory.java | 10 +- .../k8s/overlord/KubernetesTaskRunner.java | 52 ++++--- .../overlord/KubernetesTaskRunnerConfig.java | 2 +- .../k8s/overlord/KubernetesWorkItem.java | 21 ++- .../k8s/overlord/PeonLifecycleFactory.java | 7 +- .../k8s/overlord/common/JobResponse.java | 9 +- .../overlord/common/KubernetesPeonClient.java | 6 +- .../druid/k8s/overlord/common/PeonPhase.java | 62 --------- .../overlord/KubernetesPeonLifecycleTest.java | 130 +++++++++++++----- .../KubernetesTaskRunnerConfigTest.java | 4 +- .../overlord/KubernetesTaskRunnerTest.java | 21 ++- .../k8s/overlord/KubernetesWorkItemTest.java | 11 +- .../overlord/TestPeonLifecycleFactory.java | 7 +- .../k8s/overlord/common/JobResponseTest.java | 8 +- .../common/KubernetesPeonClientTest.java | 3 - .../k8s/overlord/common/PeonPhaseTest.java | 43 ------ .../DruidPeonClientIntegrationTest.java | 5 +- .../resources/expectedEphemeralOutput.yaml | 2 +- .../expectedMultiContainerOutput.yaml | 2 +- .../expectedMultiContainerOutputOrder.yaml | 2 +- .../src/test/resources/expectedNoopJob.yaml | 2 +- .../resources/expectedNoopJobLongIds.yaml | 2 +- .../resources/expectedNoopJobTlsEnabled.yaml | 2 +- .../src/test/resources/expectedPodSpec.yaml | 2 +- .../expectedSingleContainerOutput.yaml | 2 +- .../indexing/common/task/AbstractTask.java | 10 -- .../SeekableStreamIndexTaskRunner.java | 3 +- .../supervisor/SeekableStreamSupervisor.java | 7 + .../ServiceAnnouncingChatHandlerProvider.java | 4 +- 30 files changed, 235 insertions(+), 226 deletions(-) delete mode 100644 extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java delete mode 100644 extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 5c6c7c6b3ebe..cb09decc9d5d 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -31,6 +31,9 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.indexing.overlord.TaskRunnerUtils; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; @@ -47,6 +50,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -89,6 +94,8 @@ protected enum State private final KubernetesPeonClient kubernetesClient; private final ObjectMapper mapper; private final TaskStateListener stateListener; + private final List> listeners; + @MonotonicNonNull private LogWatch logWatch; @@ -99,7 +106,8 @@ protected KubernetesPeonLifecycle( KubernetesPeonClient kubernetesClient, TaskLogs taskLogs, ObjectMapper mapper, - TaskStateListener stateListener + TaskStateListener stateListener, + List> listeners ) { this.taskId = new K8sTaskId(task); @@ -108,6 +116,7 @@ protected KubernetesPeonLifecycle( this.taskLogs = taskLogs; this.mapper = mapper; this.stateListener = stateListener; + this.listeners = listeners; } /** @@ -178,7 +187,11 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio { try { updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); - + TaskRunnerUtils.notifyLocationChanged( + listeners, + task.getId(), + getTaskLocation() + ); JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( taskId, timeout, @@ -190,9 +203,10 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio finally { try { saveLogs(); + shutdown(); } catch (Exception e) { - log.warn(e, "Log processing failed for task [%s]", taskId); + log.warn(e, "Cleanup failed for task [%s]", taskId); } stopTask(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java index bf4e3a712577..2998f3fc9212 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java @@ -21,9 +21,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.tasklogs.TaskLogs; +import java.util.List; +import java.util.concurrent.Executor; + public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory { private final KubernetesPeonClient client; @@ -42,14 +47,15 @@ public KubernetesPeonLifecycleFactory( } @Override - public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener) + public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List> listeners) { return new KubernetesPeonLifecycle( task, client, taskLogs, mapper, - stateListener + stateListener, + listeners ); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index a0a29dcbbb92..439a1d94effe 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -146,16 +146,20 @@ public Optional streamTaskLog(String taskid, long offset) public ListenableFuture run(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> { + exec.submit(() -> runTask(task)); + return new KubernetesWorkItem(task); + }).getResult(); } } protected ListenableFuture joinAsync(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> { + exec.submit(() -> joinTask(task)); + return new KubernetesWorkItem(task); + }).getResult(); } } @@ -172,10 +176,12 @@ private TaskStatus joinTask(Task task) @VisibleForTesting protected TaskStatus doTask(Task task, boolean run) { + TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task Execution not started"); try { KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build( task, - this::emitTaskStateMetrics + this::emitTaskStateMetrics, + listeners ); synchronized (tasks) { @@ -188,7 +194,6 @@ protected TaskStatus doTask(Task task, boolean run) workItem.setKubernetesPeonLifecycle(peonLifecycle); } - TaskStatus taskStatus; if (run) { taskStatus = peonLifecycle.run( adapter.fromTask(task), @@ -201,15 +206,16 @@ protected TaskStatus doTask(Task task, boolean run) config.getTaskTimeout().toStandardDuration().getMillis() ); } - - updateStatus(task, taskStatus); - return taskStatus; } catch (Exception e) { log.error(e, "Task [%s] execution caught an exception", task.getId()); + taskStatus = TaskStatus.failure(task.getId(), "Execution while starting task execution"); throw new RuntimeException(e); } + finally { + updateStatus(task, taskStatus); + } } @VisibleForTesting @@ -242,13 +248,15 @@ protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String @Override public void updateStatus(Task task, TaskStatus status) { - TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); - } + KubernetesWorkItem workItem = tasks.get(task.getId()); + if (workItem != null && !workItem.getResult().isDone()) { + log.info("Manually calling update status for [%s]", task.getId()); + workItem.setResult(status); + } - @Override - public void updateLocation(Task task, TaskLocation location) - { - TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), location); + // Notify all listeners by default + log.info("Notifying listeners [%s]", task.getId()); + TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); } @Override @@ -399,7 +407,7 @@ public void unregisterListener(String listenerId) for (Pair pair : listeners) { if (pair.lhs != null && pair.lhs.getListenerId().equals(listenerId)) { listeners.remove(pair); - log.debug("Unregistered listener [%s]", listenerId); + log.info("Unregistered listener [%s]", listenerId); return; } } @@ -415,8 +423,18 @@ public void registerListener(TaskRunnerListener listener, Executor executor) } final Pair listenerPair = Pair.of(listener, executor); - log.debug("Registered listener [%s]", listener.getListenerId()); + log.info("Registered listener [%s]", listener.getListenerId()); listeners.add(listenerPair); + + for (Map.Entry entry : tasks.entrySet()) { + if (entry.getValue().isRunning()) { + TaskRunnerUtils.notifyLocationChanged( + ImmutableList.of(listenerPair), + entry.getKey(), + entry.getValue().getLocation() + ); + } + } } @Override diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index 0d67c55b30aa..5fdcd9cfbb71 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -78,7 +78,7 @@ public class KubernetesTaskRunnerConfig @JsonProperty @NotNull // how long to wait for the jobs to be cleaned up. - private Period taskCleanupDelay = new Period("P2D"); + private Period taskCleanupDelay = new Period("PT1H"); @JsonProperty @NotNull diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 94d4bbb67f63..ca8779d69b52 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -21,7 +21,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -36,9 +36,17 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem private final Task task; private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; - public KubernetesWorkItem(Task task, ListenableFuture statusFuture) + private final SettableFuture result; + + public KubernetesWorkItem(Task task) + { + this(task, SettableFuture.create()); + } + + public KubernetesWorkItem(Task task, SettableFuture result) { - super(task.getId(), statusFuture); + super(task.getId(), result); + this.result = result; this.task = task; } @@ -51,7 +59,7 @@ protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle k protected synchronized void shutdown() { - if (this.kubernetesPeonLifecycle != null) { + if (this.kubernetesPeonLifecycle != null && !result.isDone()) { this.kubernetesPeonLifecycle.startWatchingLogs(); this.kubernetesPeonLifecycle.shutdown(); } @@ -119,4 +127,9 @@ public Task getTask() { return task; } + + public void setResult(TaskStatus status) + { + result.set(status); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java index 2a234ebc5786..2b180fb9dac0 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java @@ -20,8 +20,13 @@ package org.apache.druid.k8s.overlord; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.java.util.common.Pair; + +import java.util.List; +import java.util.concurrent.Executor; public interface PeonLifecycleFactory { - KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener); + KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List> listeners); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java index a7a8156468f6..26fe5b98f0bd 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java @@ -32,12 +32,10 @@ public class JobResponse private static final EmittingLogger LOGGER = new EmittingLogger(JobResponse.class); private final Job job; - private final PeonPhase phase; - public JobResponse(@Nullable Job job, PeonPhase phase) + public JobResponse(@Nullable Job job) { this.job = job; - this.phase = phase; } public Job getJob() @@ -45,11 +43,6 @@ public Job getJob() return job; } - public PeonPhase getPhase() - { - return phase; - } - public long getJobDuration() { long duration = -1L; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java index 9fdc25fa6455..147ea732d0ca 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java @@ -101,13 +101,13 @@ public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, Time ); if (job == null) { log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", taskId); - return new JobResponse(null, PeonPhase.FAILED); + return new JobResponse(null); } if (job.getStatus().getSucceeded() != null) { - return new JobResponse(job, PeonPhase.SUCCEEDED); + return new JobResponse(job); } log.warn("Task %s failed with status %s", taskId, job.getStatus()); - return new JobResponse(job, PeonPhase.FAILED); + return new JobResponse(job); }); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java deleted file mode 100644 index 6efcd34872b8..000000000000 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.k8s.overlord.common; - -import io.fabric8.kubernetes.api.model.Pod; - -import java.util.Arrays; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - -public enum PeonPhase -{ - PENDING("Pending"), - SUCCEEDED("Succeeded"), - FAILED("Failed"), - UNKNOWN("Unknown"), - RUNNING("Running"); - - private static final Map PHASE_MAP = Arrays.stream(PeonPhase.values()) - .collect(Collectors.toMap( - PeonPhase::getPhase, - Function.identity() - )); - private final String phase; - - PeonPhase(String phase) - { - this.phase = phase; - } - - public String getPhase() - { - return phase; - } - - public static PeonPhase getPhaseFor(Pod pod) - { - if (pod == null) { - return UNKNOWN; - } - return PHASE_MAP.get(pod.getStatus().getPhase()); - } - -} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 1c6e429a3dc3..25f7039bf57d 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -31,11 +32,12 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.k8s.overlord.common.JobResponse; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; -import org.apache.druid.k8s.overlord.common.PeonPhase; import org.apache.druid.tasklogs.TaskLogs; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -50,6 +52,8 @@ import java.io.IOException; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -65,6 +69,8 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport @Mock LogWatch logWatch; @Mock KubernetesPeonLifecycle.TaskStateListener stateListener; + List> listeners = ImmutableList.of(); + private ObjectMapper mapper; private Task task; private K8sTaskId k8sTaskId; @@ -86,7 +92,8 @@ public void test_run() throws IOException kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ) { @Override @@ -131,7 +138,8 @@ public void test_run_useTaskManager() throws IOException kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ) { @Override @@ -175,7 +183,8 @@ public void test_run_whenCalledMultipleTimes_raisesIllegalStateException() throw kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ) { @Override @@ -268,15 +277,19 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); EasyMock.expect(kubernetesClient.waitForPeonJobCompletion( EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(null, PeonPhase.FAILED)); + )).andReturn(new JobResponse(null)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()); + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); + EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -307,7 +320,8 @@ public void test_join() throws IOException kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); Job job = new JobBuilder() @@ -325,8 +339,12 @@ public void test_join() throws IOException EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); + )).andReturn(new JobResponse(job)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of( IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8) )); @@ -359,7 +377,8 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); Job job = new JobBuilder() @@ -375,8 +394,15 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); + )).andReturn(new JobResponse(job)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + + // Only update the location the first time, second call doesn't reach this point in the logic + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); + // Always try to delete the job + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true).times(2); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); @@ -419,7 +445,8 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); Job job = new JobBuilder() @@ -435,8 +462,12 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); + )).andReturn(new JobResponse(job)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -469,7 +500,8 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); Job job = new JobBuilder() @@ -485,8 +517,12 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); + )).andReturn(new JobResponse(job)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -519,7 +555,8 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); Job job = new JobBuilder() @@ -535,8 +572,12 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); + )).andReturn(new JobResponse(job)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); @@ -549,6 +590,9 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th logWatch.close(); EasyMock.expectLastCall(); + // We should still try to cleanup the Job after + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -569,7 +613,8 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); EasyMock.expect(kubernetesClient.waitForPeonJobCompletion( @@ -578,6 +623,9 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr EasyMock.eq(TimeUnit.MILLISECONDS) )).andThrow(new RuntimeException()); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); // We should still try to push logs EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); @@ -608,7 +656,8 @@ public void test_shutdown_withNotStartedTaskState() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); peonLifecycle.shutdown(); } @@ -621,7 +670,8 @@ public void test_shutdown_withPendingTaskState() throws NoSuchFieldException, Il kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING); @@ -642,7 +692,8 @@ public void test_shutdown_withRunningTaskState() throws NoSuchFieldException, Il kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -663,7 +714,8 @@ public void test_shutdown_withStoppedTaskState() throws NoSuchFieldException, Il kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED); @@ -678,7 +730,8 @@ public void test_streamLogs_withNotStartedTaskState() throws NoSuchFieldExceptio kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED); @@ -693,7 +746,8 @@ public void test_streamLogs_withPendingTaskState() throws NoSuchFieldException, kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING); @@ -708,7 +762,8 @@ public void test_streamLogs_withRunningTaskState() throws NoSuchFieldException, kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -731,7 +786,8 @@ public void test_streamLogs_withStoppedTaskState() throws NoSuchFieldException, kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED); @@ -747,7 +803,8 @@ public void test_getTaskLocation_withNotStartedTaskState_returnsUnknown() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED); @@ -763,7 +820,8 @@ public void test_getTaskLocation_withPendingTaskState_returnsUnknown() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING); @@ -779,7 +837,8 @@ public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnkn kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -801,7 +860,8 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithoutStatus_r kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -829,7 +889,8 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_retu kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -865,7 +926,8 @@ public void test_getTaskLocation_saveTaskLocation() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -901,7 +963,8 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithT kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -938,7 +1001,8 @@ public void test_getTaskLocation_withStoppedTaskState_returnsUnknown() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED); EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java index 1f4a7281f649..579b7539d818 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java @@ -47,7 +47,7 @@ public void test_deserializable() throws IOException Assert.assertNull(config.getGraceTerminationPeriodSeconds()); Assert.assertTrue(config.isDisableClientProxy()); Assert.assertEquals(new Period("PT4H"), config.getTaskTimeout()); - Assert.assertEquals(new Period("P2D"), config.getTaskCleanupDelay()); + Assert.assertEquals(new Period("PT1H"), config.getTaskCleanupDelay()); Assert.assertEquals(new Period("PT10m"), config.getTaskCleanupInterval()); Assert.assertEquals(new Period("PT1H"), config.getTaskLaunchTimeout()); Assert.assertEquals(ImmutableList.of(), config.getPeonMonitors()); @@ -72,7 +72,7 @@ public void test_builder_preservesDefaults() Assert.assertNull(config.getGraceTerminationPeriodSeconds()); Assert.assertTrue(config.isDisableClientProxy()); Assert.assertEquals(new Period("PT4H"), config.getTaskTimeout()); - Assert.assertEquals(new Period("P2D"), config.getTaskCleanupDelay()); + Assert.assertEquals(new Period("PT1H"), config.getTaskCleanupDelay()); Assert.assertEquals(new Period("PT10m"), config.getTaskCleanupInterval()); Assert.assertEquals(new Period("PT1H"), config.getTaskLaunchTimeout()); Assert.assertEquals(ImmutableList.of(), config.getPeonMonitors()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 36a7b4cfcd9c..30a8e47928c0 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -28,6 +28,7 @@ import org.apache.commons.io.IOUtils; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; @@ -116,11 +117,7 @@ protected ListenableFuture joinAsync(Task task) { return tasks.computeIfAbsent( task.getId(), - k -> new KubernetesWorkItem( - task, - Futures.immediateFuture(TaskStatus.success(task.getId())) - ) - ).getResult(); + k -> new KubernetesWorkItem(task)).getResult(); } }; @@ -249,7 +246,7 @@ public void test_run_withExistingTask_returnsExistingWorkItem() } @Test - public void test_run_whenExceptionThrown_throwsRuntimeException() throws IOException + public void test_run_whenExceptionThrown_throwsRuntimeException() throws Exception { Job job = new JobBuilder() .withNewMetadata() @@ -269,9 +266,8 @@ public void test_run_whenExceptionThrown_throwsRuntimeException() throws IOExcep replayAll(); ListenableFuture future = runner.run(task); - - Exception e = Assert.assertThrows(ExecutionException.class, future::get); - Assert.assertTrue(e.getCause() instanceof RuntimeException); + TaskStatus taskStatus = future.get(); + Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); verifyAll(); } @@ -303,16 +299,15 @@ public void test_join_withExistingTask_returnsExistingWorkItem() } @Test - public void test_join_whenExceptionThrown_throwsRuntimeException() + public void test_join_whenExceptionThrown_throwsRuntimeException() throws ExecutionException, InterruptedException { EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andThrow(new IllegalStateException()); replayAll(); ListenableFuture future = runner.joinAsync(task); - - Exception e = Assert.assertThrows(ExecutionException.class, future::get); - Assert.assertTrue(e.getCause() instanceof RuntimeException); + TaskStatus taskStatus = future.get(); + Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); verifyAll(); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index 7d17193b1714..f2f398658e05 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -56,6 +56,7 @@ public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() null, null, null, + null, null )); @@ -66,6 +67,7 @@ public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() null, null, null, + null, null )) ); @@ -80,6 +82,8 @@ public void test_shutdown_withoutKubernetesPeonLifecycle() @Test public void test_shutdown_withKubernetesPeonLifecycle() { + KubernetesWorkItem workItem = new KubernetesWorkItem(task); + kubernetesPeonLifecycle.shutdown(); EasyMock.expectLastCall(); kubernetesPeonLifecycle.startWatchingLogs(); @@ -87,7 +91,6 @@ public void test_shutdown_withKubernetesPeonLifecycle() replayAll(); workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); - workItem.shutdown(); verifyAll(); } @@ -158,6 +161,7 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_returnsPending() null, null, null, + null, null )); @@ -172,6 +176,7 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_inPendingState_r null, null, null, + null, null ) { @Override @@ -194,6 +199,7 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_inRunningState_r null, null, null, + null, null ) { @Override @@ -216,6 +222,7 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_inStoppedState_r null, null, null, + null, null ) { @Override @@ -244,6 +251,7 @@ public void test_streamTaskLogs_withKubernetesPeonLifecycle() null, null, null, + null, null )); Assert.assertFalse(workItem.streamTaskLogs().isPresent()); @@ -263,6 +271,7 @@ public void test_getLocation_withKubernetesPeonLifecycle() null, null, null, + null, null )); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java index 8b8c43c0d71c..fa0f79bc1d20 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java @@ -20,6 +20,11 @@ package org.apache.druid.k8s.overlord; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.java.util.common.Pair; + +import java.util.List; +import java.util.concurrent.Executor; public class TestPeonLifecycleFactory implements PeonLifecycleFactory { @@ -31,7 +36,7 @@ public TestPeonLifecycleFactory(KubernetesPeonLifecycle kubernetesPeonLifecycle) } @Override - public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener) + public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List> listeners) { return kubernetesPeonLifecycle; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java index 2e2043578aa1..cf9f345fd7a5 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java @@ -39,7 +39,7 @@ void testCompletionTime() .endStatus() .build(); - JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED); + JobResponse response = new JobResponse(job); Assertions.assertEquals(58000L, response.getJobDuration()); } @@ -56,7 +56,7 @@ void testNoDuration() .endStatus() .build(); - JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED); + JobResponse response = new JobResponse(job); Assertions.assertEquals(-1, response.getJobDuration()); } @@ -70,7 +70,7 @@ void testMakingCodeCoverageHappy() .endMetadata() .build(); - JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED); + JobResponse response = new JobResponse(job); Assertions.assertEquals(-1, response.getJobDuration()); } @@ -78,7 +78,7 @@ void testMakingCodeCoverageHappy() @Test void testNullJob() { - JobResponse response = new JobResponse(null, PeonPhase.SUCCEEDED); + JobResponse response = new JobResponse(null); long duration = response.getJobDuration(); Assertions.assertEquals(-1, duration); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java index f6096b675d6c..cde7faa473bc 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java @@ -153,7 +153,6 @@ void test_waitForPeonJobCompletion_withSuccessfulJob_returnsJobResponseWithJobAn TimeUnit.SECONDS ); - Assertions.assertEquals(PeonPhase.SUCCEEDED, jobResponse.getPhase()); Assertions.assertNotNull(jobResponse.getJob()); } @@ -178,7 +177,6 @@ void test_waitForPeonJobCompletion_withFailedJob_returnsJobResponseWithJobAndFai TimeUnit.SECONDS ); - Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase()); Assertions.assertNotNull(jobResponse.getJob()); } @@ -191,7 +189,6 @@ void test_waitforPeonJobCompletion_withoutRunningJob_returnsJobResponseWithEmpty TimeUnit.SECONDS ); - Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase()); Assertions.assertNull(jobResponse.getJob()); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java deleted file mode 100644 index 3f6bd71312be..000000000000 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.k8s.overlord.common; - -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodStatus; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class PeonPhaseTest -{ - - @Test - void testGetPhaseForToMakeCoverageHappy() - { - Pod pod = mock(Pod.class); - PodStatus status = mock(PodStatus.class); - when(status.getPhase()).thenReturn("Succeeded"); - when(pod.getStatus()).thenReturn(status); - assertEquals(PeonPhase.UNKNOWN, PeonPhase.getPhaseFor(null)); - assertEquals(PeonPhase.SUCCEEDED, PeonPhase.getPhaseFor(pod)); - } -} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java index 098161685883..2cc5bf15c65a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java @@ -38,13 +38,11 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; -import org.apache.druid.k8s.overlord.common.JobResponse; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.k8s.overlord.common.KubernetesClientApi; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.common.PeonCommandContext; -import org.apache.druid.k8s.overlord.common.PeonPhase; import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; import org.junit.jupiter.api.BeforeEach; @@ -184,9 +182,8 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception assertEquals(task, taskFromPod); - JobResponse jobStatusResult = peonClient.waitForPeonJobCompletion(taskId, 2, TimeUnit.MINUTES); + peonClient.waitForPeonJobCompletion(taskId, 2, TimeUnit.MINUTES); thread.join(); - assertEquals(PeonPhase.SUCCEEDED, jobStatusResult.getPhase()); // as long as there were no exceptions we are good! assertEquals(expectedLogs, actualLogs); // cleanup my job diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml index 30960cdbc668..741a032eb6c9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml @@ -62,4 +62,4 @@ spec: ephemeral-storage: 1Gi hostname: "id-3e70afe5cd823dfc7dd308eea616426b" restartPolicy: "Never" - ttlSecondsAfterFinished: 172800 \ No newline at end of file + ttlSecondsAfterFinished: 3600 \ No newline at end of file diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml index 70b8b7c1d242..73f31ddfb508 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml @@ -105,4 +105,4 @@ spec: name: "graveyard" - emptyDir: {} name: "kubexit" - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml index 70b8b7c1d242..73f31ddfb508 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml @@ -105,4 +105,4 @@ spec: name: "graveyard" - emptyDir: {} name: "kubexit" - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml index 2cef837f3972..004fed9585af 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 template: metadata: labels: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml index cf16c49c5db1..b6ca8a2cefe6 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 template: metadata: labels: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml index a230ac913a60..547887e90847 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 template: metadata: labels: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml index e46de1337883..ecd9416c563a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml @@ -104,4 +104,4 @@ spec: name: "graveyard" - emptyDir: {} name: "kubexit" - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml index f270368fb552..7afc393c56af 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml @@ -57,4 +57,4 @@ spec: cpu: "1000m" memory: "2400000000" restartPolicy: "Never" - ttlSecondsAfterFinished: 172800 \ No newline at end of file + ttlSecondsAfterFinished: 3600 \ No newline at end of file diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index e5b6ab1b7312..ea0cb566b2ce 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -25,13 +25,11 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import org.apache.druid.common.utils.IdUtils; -import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.actions.UpdateLocationAction; import org.apache.druid.indexing.common.actions.UpdateStatusAction; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IAE; @@ -42,14 +40,12 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.segment.indexing.BatchIOConfig; -import org.apache.druid.server.DruidNode; import org.joda.time.Interval; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.net.InetAddress; import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; @@ -155,11 +151,6 @@ public String setup(TaskToolbox toolbox) throws Exception FileUtils.mkdirp(attemptDir); reportsFile = new File(attemptDir, "report.json"); statusFile = new File(attemptDir, "status.json"); - InetAddress hostName = InetAddress.getLocalHost(); - DruidNode node = toolbox.getTaskExecutorNode(); - toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.create( - hostName.getHostAddress(), node.getPlaintextPort(), node.getTlsPort(), node.isEnablePlaintextPort() - ))); } log.debug("Task setup complete"); return null; @@ -211,7 +202,6 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception status = new UpdateStatusAction("failure"); } toolbox.getTaskActionClient().submit(status); - toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.unknown())); if (reportsFile != null && reportsFile.exists()) { toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 27909aea83cb..3891747281c7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -358,6 +358,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception { startTime = DateTimes.nowUtc(); status = Status.STARTING; + log.info("Starting [%s] [%s]", task.getId(), status); setToolbox(toolbox); @@ -384,7 +385,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception initializeSequences(); - log.debug("Found chat handler of class[%s]", toolbox.getChatHandlerProvider().getClass().getName()); + log.info("Found chat handler of class[%s] [%s]", toolbox.getChatHandlerProvider().getClass().getName(), task.getId()); toolbox.getChatHandlerProvider().register(task.getId(), this, false); runThread = Thread.currentThread(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 2f6cb008b842..0f18feb86a7d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2113,6 +2113,12 @@ public Boolean apply(Pair { + log.info("Got status for [%s]", taskId); if (status == SeekableStreamIndexTaskRunner.Status.PUBLISHING) { return FutureUtils.transform( taskClient.getEndOffsetsAsync(taskId), diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java index 802be54cc115..0a56ee109dc2 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java @@ -66,7 +66,7 @@ public void register(final String service, ChatHandler handler) @Override public void register(final String service, ChatHandler handler, boolean announce) { - log.debug("Registering Eventhandler[%s]", service); + log.info("Registering Eventhandler[%s]", service); if (handlers.putIfAbsent(service, handler) != null) { throw new ISE("handler already registered for service[%s]", service); @@ -89,7 +89,7 @@ public void register(final String service, ChatHandler handler, boolean announce @Override public void unregister(final String service) { - log.debug("Unregistering chat handler[%s]", service); + log.info("Unregistering chat handler[%s]", service); final ChatHandler handler = handlers.get(service); if (handler == null) { From 107bf8f0f17c66a7e9cda24ae4d83f7807687477 Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 9 Oct 2023 09:55:09 -0400 Subject: [PATCH 02/12] Remove extra log lines --- .../druid/k8s/overlord/KubernetesTaskRunner.java | 12 +++++------- .../SeekableStreamIndexTaskRunner.java | 3 +-- .../supervisor/SeekableStreamSupervisor.java | 7 ------- .../ServiceAnnouncingChatHandlerProvider.java | 4 ++-- 4 files changed, 8 insertions(+), 18 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 439a1d94effe..c71aabe8c957 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -176,7 +176,7 @@ private TaskStatus joinTask(Task task) @VisibleForTesting protected TaskStatus doTask(Task task, boolean run) { - TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task Execution not started"); + TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution never started"); try { KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build( task, @@ -210,7 +210,7 @@ protected TaskStatus doTask(Task task, boolean run) } catch (Exception e) { log.error(e, "Task [%s] execution caught an exception", task.getId()); - taskStatus = TaskStatus.failure(task.getId(), "Execution while starting task execution"); + taskStatus = TaskStatus.failure(task.getId(), "Could not start task execution"); throw new RuntimeException(e); } finally { @@ -250,12 +250,10 @@ public void updateStatus(Task task, TaskStatus status) { KubernetesWorkItem workItem = tasks.get(task.getId()); if (workItem != null && !workItem.getResult().isDone()) { - log.info("Manually calling update status for [%s]", task.getId()); workItem.setResult(status); } - // Notify all listeners by default - log.info("Notifying listeners [%s]", task.getId()); + // Notify listeners even if the result is set to handle the shutdown case. TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); } @@ -407,7 +405,7 @@ public void unregisterListener(String listenerId) for (Pair pair : listeners) { if (pair.lhs != null && pair.lhs.getListenerId().equals(listenerId)) { listeners.remove(pair); - log.info("Unregistered listener [%s]", listenerId); + log.debug("Unregistered listener [%s]", listenerId); return; } } @@ -423,7 +421,7 @@ public void registerListener(TaskRunnerListener listener, Executor executor) } final Pair listenerPair = Pair.of(listener, executor); - log.info("Registered listener [%s]", listener.getListenerId()); + log.debug("Registered listener [%s]", listener.getListenerId()); listeners.add(listenerPair); for (Map.Entry entry : tasks.entrySet()) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 3891747281c7..27909aea83cb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -358,7 +358,6 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception { startTime = DateTimes.nowUtc(); status = Status.STARTING; - log.info("Starting [%s] [%s]", task.getId(), status); setToolbox(toolbox); @@ -385,7 +384,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception initializeSequences(); - log.info("Found chat handler of class[%s] [%s]", toolbox.getChatHandlerProvider().getClass().getName(), task.getId()); + log.debug("Found chat handler of class[%s]", toolbox.getChatHandlerProvider().getClass().getName()); toolbox.getChatHandlerProvider().register(task.getId(), this, false); runThread = Thread.currentThread(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 0f18feb86a7d..2f6cb008b842 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2113,12 +2113,6 @@ public Boolean apply(Pair { - log.info("Got status for [%s]", taskId); if (status == SeekableStreamIndexTaskRunner.Status.PUBLISHING) { return FutureUtils.transform( taskClient.getEndOffsetsAsync(taskId), diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java index 0a56ee109dc2..802be54cc115 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ServiceAnnouncingChatHandlerProvider.java @@ -66,7 +66,7 @@ public void register(final String service, ChatHandler handler) @Override public void register(final String service, ChatHandler handler, boolean announce) { - log.info("Registering Eventhandler[%s]", service); + log.debug("Registering Eventhandler[%s]", service); if (handlers.putIfAbsent(service, handler) != null) { throw new ISE("handler already registered for service[%s]", service); @@ -89,7 +89,7 @@ public void register(final String service, ChatHandler handler, boolean announce @Override public void unregister(final String service) { - log.info("Unregistering chat handler[%s]", service); + log.debug("Unregistering chat handler[%s]", service); final ChatHandler handler = handlers.get(service); if (handler == null) { From d7b82cf625a6c4d8ce6e4903a76f8e4b33ada1a6 Mon Sep 17 00:00:00 2001 From: George Wu Date: Tue, 10 Oct 2023 10:29:44 -0400 Subject: [PATCH 03/12] Fix unit tests --- .../k8s/overlord/KubernetesPeonLifecycleTest.java | 14 ++++++++++---- .../test/resources/expectedNoopJobNoTaskJson.yaml | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 25f7039bf57d..a5e579de0e90 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -233,7 +233,8 @@ public void test_run_whenExceptionRaised_setsRunnerTaskStateToNone() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ) { @Override @@ -315,13 +316,15 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException @Test public void test_join() throws IOException { + Executor executor = EasyMock.mock(Executor.class); + TaskRunnerListener taskRunnerListener = EasyMock.mock(TaskRunnerListener.class); KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, kubernetesClient, taskLogs, mapper, stateListener, - listeners + ImmutableList.of(Pair.of(taskRunnerListener, executor)) ); Job job = new JobBuilder() @@ -356,6 +359,10 @@ public void test_join() throws IOException EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); + executor.execute(EasyMock.anyObject()); + taskRunnerListener.locationChanged(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall(); + EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -577,7 +584,6 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) ); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); @@ -636,7 +642,7 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml index d72d0ef37b03..8ecdaf50b012 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 template: metadata: labels: From 7962f27c84549a520daa4139def64953542c51a8 Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 11 Oct 2023 10:08:55 -0400 Subject: [PATCH 04/12] fix unit tests --- .../k8s/overlord/KubernetesTaskRunner.java | 2 +- .../overlord/KubernetesPeonLifecycleTest.java | 2 +- .../overlord/KubernetesTaskRunnerTest.java | 77 +++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index c71aabe8c957..8256059fa9e4 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -249,7 +249,7 @@ protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String public void updateStatus(Task task, TaskStatus status) { KubernetesWorkItem workItem = tasks.get(task.getId()); - if (workItem != null && !workItem.getResult().isDone()) { + if (workItem != null && !workItem.getResult().isDone() && status.isComplete()) { workItem.setResult(status); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index a5e579de0e90..e984e449b282 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -360,8 +360,8 @@ public void test_join() throws IOException logWatch.close(); EasyMock.expectLastCall(); executor.execute(EasyMock.anyObject()); - taskRunnerListener.locationChanged(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall(); + taskRunnerListener.locationChanged(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 30a8e47928c0..4e7564abcdaa 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -31,6 +31,7 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; @@ -77,6 +78,9 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle; @Mock private ServiceEmitter emitter; + @Mock private Executor executor; + @Mock private TaskRunnerListener taskRunnerListener; + private KubernetesTaskRunnerConfig config; private KubernetesTaskRunner runner; private Task task; @@ -268,7 +272,80 @@ public void test_run_whenExceptionThrown_throwsRuntimeException() throws Excepti ListenableFuture future = runner.run(task); TaskStatus taskStatus = future.get(); Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); + Assert.assertEquals("Could not start task execution", taskStatus.getErrorMsg()); + verifyAll(); + } + + @Test + public void test_run_updateStatus() throws ExecutionException, InterruptedException { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ); + + KubernetesWorkItem workItem = new KubernetesWorkItem(task); + runner.tasks.put(task.getId(), workItem); + TaskStatus completeTaskStatus = TaskStatus.success(task.getId()); + + replayAll(); + runner.updateStatus(task, completeTaskStatus); + verifyAll(); + + assertTrue(workItem.getResult().isDone()); + assertEquals(completeTaskStatus, workItem.getResult().get()); + } + + @Test + public void test_run_updateStatus_running() { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ); + KubernetesWorkItem workItem = new KubernetesWorkItem(task); + runner.tasks.put(task.getId(), workItem); + TaskStatus runningTaskStatus = TaskStatus.running(task.getId()); + replayAll(); + runner.updateStatus(task, runningTaskStatus); + verifyAll(); + + assertFalse(workItem.getResult().isDone()); + } + + @Test + public void test_registerListener_runningTask() { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ); + + KubernetesPeonLifecycle kubernetesPeonLifecycle = EasyMock.mock(KubernetesPeonLifecycle.class); + EasyMock.expect(kubernetesPeonLifecycle.getState()).andReturn(KubernetesPeonLifecycle.State.RUNNING); + KubernetesWorkItem workItem = new KubernetesWorkItem(task); + workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); + runner.tasks.put(task.getId(), workItem); + + Executor executor = EasyMock.mock(Executor.class); + TaskRunnerListener taskRunnerListener = EasyMock.mock(TaskRunnerListener.class); + executor.execute(EasyMock.anyObject()); + EasyMock.expectLastCall(); + taskRunnerListener.locationChanged(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall(); + + replayAll(); + runner.registerListener(taskRunnerListener, executor); verifyAll(); } From f4f3309d7ff2a2a611ceae53838ee2eace452da0 Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 11 Oct 2023 12:29:42 -0400 Subject: [PATCH 05/12] Fix unit tests --- .../druid/k8s/overlord/KubernetesTaskRunnerTest.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 4e7564abcdaa..62aec1c8fb17 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -277,7 +277,8 @@ public void test_run_whenExceptionThrown_throwsRuntimeException() throws Excepti } @Test - public void test_run_updateStatus() throws ExecutionException, InterruptedException { + public void test_run_updateStatus() throws ExecutionException, InterruptedException + { KubernetesTaskRunner runner = new KubernetesTaskRunner( taskAdapter, config, @@ -300,7 +301,8 @@ public void test_run_updateStatus() throws ExecutionException, InterruptedExcept } @Test - public void test_run_updateStatus_running() { + public void test_run_updateStatus_running() + { KubernetesTaskRunner runner = new KubernetesTaskRunner( taskAdapter, config, @@ -321,7 +323,8 @@ public void test_run_updateStatus_running() { } @Test - public void test_registerListener_runningTask() { + public void test_registerListener_runningTask() + { KubernetesTaskRunner runner = new KubernetesTaskRunner( taskAdapter, config, @@ -333,6 +336,7 @@ public void test_registerListener_runningTask() { KubernetesPeonLifecycle kubernetesPeonLifecycle = EasyMock.mock(KubernetesPeonLifecycle.class); EasyMock.expect(kubernetesPeonLifecycle.getState()).andReturn(KubernetesPeonLifecycle.State.RUNNING); + EasyMock.expect(kubernetesPeonLifecycle.getTaskLocation()).andReturn(TaskLocation.unknown()); KubernetesWorkItem workItem = new KubernetesWorkItem(task); workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); @@ -345,8 +349,10 @@ public void test_registerListener_runningTask() { EasyMock.expectLastCall(); replayAll(); + EasyMock.replay(kubernetesPeonLifecycle); runner.registerListener(taskRunnerListener, executor); verifyAll(); + EasyMock.verify(kubernetesPeonLifecycle); } @Test From b0af37c270185b3089bedf179186987d1812f77e Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 11 Oct 2023 14:50:18 -0400 Subject: [PATCH 06/12] notify listeners on task completion --- .../java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 8256059fa9e4..280114463d5e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -215,6 +215,7 @@ protected TaskStatus doTask(Task task, boolean run) } finally { updateStatus(task, taskStatus); + TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), TaskLocation.unknown()); } } From 7f68cb321b86b69d1da4af4775b59da28be3c224 Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 11 Oct 2023 15:54:51 -0400 Subject: [PATCH 07/12] Fix unit test --- .../org/apache/druid/indexing/common/task/AbstractTaskTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index 5bcadcfb7125..39b0bdfcfc50 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -109,7 +109,7 @@ public String setup(TaskToolbox toolbox) throws Exception task.run(toolbox); // call it 3 times, once to update location in setup, then one for status and location in cleanup - Mockito.verify(taskActionClient, times(3)).submit(any()); + Mockito.verify(taskActionClient, times(1)).submit(any()); verify(pusher, times(1)).pushTaskReports(eq("myID"), any()); verify(pusher, times(1)).pushTaskStatus(eq("myID"), any()); } From 5bc3b11c02f62ec7545a26f93c0fbe3f498f5f78 Mon Sep 17 00:00:00 2001 From: George Wu Date: Thu, 12 Oct 2023 09:49:07 -0400 Subject: [PATCH 08/12] unused var --- .../org/apache/druid/k8s/overlord/KubernetesTaskRunner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 280114463d5e..56de37c37980 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -147,7 +147,7 @@ public ListenableFuture run(Task task) { synchronized (tasks) { return tasks.computeIfAbsent(task.getId(), k -> { - exec.submit(() -> runTask(task)); + ListenableFuture unused = exec.submit(() -> runTask(task)); return new KubernetesWorkItem(task); }).getResult(); } @@ -157,7 +157,7 @@ protected ListenableFuture joinAsync(Task task) { synchronized (tasks) { return tasks.computeIfAbsent(task.getId(), k -> { - exec.submit(() -> joinTask(task)); + ListenableFuture unused = exec.submit(() -> joinTask(task)); return new KubernetesWorkItem(task); }).getResult(); } From 8ddda62ac3ad3177284f66e1775f1267765f003e Mon Sep 17 00:00:00 2001 From: George Wu Date: Fri, 13 Oct 2023 16:08:54 -0400 Subject: [PATCH 09/12] PR changes --- .../k8s/overlord/KubernetesPeonLifecycle.java | 5 +- .../overlord/KubernetesTaskRunnerTest.java | 12 ++-- .../common/actions/UpdateLocationAction.java | 5 +- .../druid/indexing/overlord/TaskRunner.java | 5 -- .../actions/UpdateLocationActionTest.java | 71 ------------------- 5 files changed, 9 insertions(+), 89 deletions(-) delete mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index cb09decc9d5d..97f5d04c0206 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -203,12 +203,11 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio finally { try { saveLogs(); - shutdown(); } catch (Exception e) { - log.warn(e, "Cleanup failed for task [%s]", taskId); + log.warn(e, "Log processing failed for task [%s]", taskId); } - + shutdown(); stopTask(); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 62aec1c8fb17..e04ef6300362 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -334,11 +334,11 @@ public void test_registerListener_runningTask() emitter ); - KubernetesPeonLifecycle kubernetesPeonLifecycle = EasyMock.mock(KubernetesPeonLifecycle.class); - EasyMock.expect(kubernetesPeonLifecycle.getState()).andReturn(KubernetesPeonLifecycle.State.RUNNING); - EasyMock.expect(kubernetesPeonLifecycle.getTaskLocation()).andReturn(TaskLocation.unknown()); + KubernetesPeonLifecycle runningKubernetesPeonLifecycle = EasyMock.mock(KubernetesPeonLifecycle.class); + EasyMock.expect(runningKubernetesPeonLifecycle.getState()).andReturn(KubernetesPeonLifecycle.State.RUNNING); + EasyMock.expect(runningKubernetesPeonLifecycle.getTaskLocation()).andReturn(TaskLocation.unknown()); KubernetesWorkItem workItem = new KubernetesWorkItem(task); - workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); + workItem.setKubernetesPeonLifecycle(runningKubernetesPeonLifecycle); runner.tasks.put(task.getId(), workItem); Executor executor = EasyMock.mock(Executor.class); @@ -349,10 +349,10 @@ public void test_registerListener_runningTask() EasyMock.expectLastCall(); replayAll(); - EasyMock.replay(kubernetesPeonLifecycle); + EasyMock.replay(runningKubernetesPeonLifecycle); runner.registerListener(taskRunnerListener, executor); verifyAll(); - EasyMock.verify(kubernetesPeonLifecycle); + EasyMock.verify(runningKubernetesPeonLifecycle); } @Test diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java index f4926864dcbe..ef1f20dddbd3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunner; +@Deprecated public class UpdateLocationAction implements TaskAction { @JsonIgnore @@ -58,10 +59,6 @@ public TypeReference getReturnTypeReference() @Override public Void perform(Task task, TaskActionToolbox toolbox) { - Optional taskRunner = toolbox.getTaskRunner(); - if (taskRunner.isPresent()) { - taskRunner.get().updateLocation(task, taskLocation); - } return null; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index ac1fd124ef55..99b0c05b8323 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -141,11 +141,6 @@ default void updateStatus(Task task, TaskStatus status) // do nothing } - default void updateLocation(Task task, TaskLocation location) - { - // do nothing - } - /** * The maximum number of tasks this TaskRunner can run concurrently. * Can return -1 if this method is not implemented or capacity can't be found. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java deleted file mode 100644 index 83aeb382dc0a..000000000000 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.actions; - -import com.google.common.base.Optional; -import org.apache.druid.indexer.TaskLocation; -import org.apache.druid.indexing.common.task.NoopTask; -import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskRunner; -import org.junit.Test; - -import java.net.InetAddress; -import java.net.UnknownHostException; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class UpdateLocationActionTest -{ - @Test - public void testFlow() throws UnknownHostException - { - // get my task location - InetAddress hostName = InetAddress.getLocalHost(); - TaskLocation myLocation = TaskLocation.create(hostName.getHostAddress(), 1, 2); - UpdateLocationAction action = new UpdateLocationAction(myLocation); - Task task = NoopTask.create(); - TaskActionToolbox toolbox = mock(TaskActionToolbox.class); - TaskRunner runner = mock(TaskRunner.class); - when(toolbox.getTaskRunner()).thenReturn(Optional.of(runner)); - action.perform(task, toolbox); - verify(runner, times(1)).updateLocation(eq(task), eq(myLocation)); - } - - @Test - public void testWithNoTaskRunner() throws UnknownHostException - { - // get my task location - InetAddress hostName = InetAddress.getLocalHost(); - TaskLocation myLocation = TaskLocation.create(hostName.getHostAddress(), 1, 2); - UpdateLocationAction action = new UpdateLocationAction(myLocation); - Task task = NoopTask.create(); - TaskActionToolbox toolbox = mock(TaskActionToolbox.class); - TaskRunner runner = mock(TaskRunner.class); - when(toolbox.getTaskRunner()).thenReturn(Optional.absent()); - action.perform(task, toolbox); - verify(runner, never()).updateStatus(any(), any()); - } -} From 9ca3f91dbd59b6c514a19a97a6e54a6bc75ad51e Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 16 Oct 2023 08:15:34 -0700 Subject: [PATCH 10/12] Fix unit tests --- .../k8s/overlord/KubernetesAndWorkerTaskRunner.java | 6 ------ .../k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java | 9 --------- 2 files changed, 15 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java index 243f6626c664..8c41772aea69 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java @@ -283,10 +283,4 @@ public void updateStatus(Task task, TaskStatus status) { kubernetesTaskRunner.updateStatus(task, status); } - - @Override - public void updateLocation(Task task, TaskLocation location) - { - kubernetesTaskRunner.updateLocation(task, location); - } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java index af5a6c39bb0b..80010b9e5396 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java @@ -332,13 +332,4 @@ public void test_updateStatus() runner.updateStatus(task, TaskStatus.running(ID)); verifyAll(); } - - @Test - public void test_updateLocation() - { - kubernetesTaskRunner.updateLocation(task, TaskLocation.unknown()); - replayAll(); - runner.updateLocation(task, TaskLocation.unknown()); - verifyAll(); - } } From 3ae0327cafa6308ec481a5ce532ad63ffe8301d6 Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 16 Oct 2023 08:35:59 -0700 Subject: [PATCH 11/12] Fix checkstyle --- .../druid/indexing/common/actions/UpdateLocationAction.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java index ef1f20dddbd3..c823f612b81f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java @@ -23,10 +23,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Optional; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskRunner; @Deprecated public class UpdateLocationAction implements TaskAction From cf05797d0ce7413e941d13b6a6a401bc301d2b0e Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 16 Oct 2023 12:02:38 -0700 Subject: [PATCH 12/12] PR changes --- .../druid/k8s/overlord/KubernetesPeonLifecycle.java | 8 +++++--- .../org/apache/druid/k8s/overlord/KubernetesWorkItem.java | 2 ++ .../indexing/common/actions/UpdateLocationAction.java | 5 +++++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 97f5d04c0206..ea3c2a19d1c2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -203,12 +203,14 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio finally { try { saveLogs(); + shutdown(); } catch (Exception e) { - log.warn(e, "Log processing failed for task [%s]", taskId); + log.warn(e, "Cleanup failed for task [%s]", taskId); + } + finally { + stopTask(); } - shutdown(); - stopTask(); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index ca8779d69b52..b089b4dd2db0 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -19,6 +19,7 @@ package org.apache.druid.k8s.overlord; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SettableFuture; @@ -43,6 +44,7 @@ public KubernetesWorkItem(Task task) this(task, SettableFuture.create()); } + @VisibleForTesting public KubernetesWorkItem(Task task, SettableFuture result) { super(task.getId(), result); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java index c823f612b81f..088169d3a537 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java @@ -26,6 +26,11 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexing.common.task.Task; +/* This class was added for mm-less ingestion in order to let the peon manage its own location lifecycle by submitting +actions to the overlord. https://github.com/apache/druid/pull/15133 moved this location logic to the overlord itself +so this Action is no longer needed. For backwards compatibility with old peons, this class was left in but can be deprecated +for a later druid release. +*/ @Deprecated public class UpdateLocationAction implements TaskAction {