Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate task lifecycle from kubernetes/location lifecycle #15133

Merged
merged 12 commits into from
Oct 17, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
Expand All @@ -47,6 +50,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -89,6 +94,8 @@ protected enum State
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
private final List<Pair<TaskRunnerListener, Executor>> listeners;

@MonotonicNonNull
private LogWatch logWatch;

Expand All @@ -99,7 +106,8 @@ protected KubernetesPeonLifecycle(
KubernetesPeonClient kubernetesClient,
TaskLogs taskLogs,
ObjectMapper mapper,
TaskStateListener stateListener
TaskStateListener stateListener,
List<Pair<TaskRunnerListener, Executor>> listeners
)
{
this.taskId = new K8sTaskId(task);
Expand All @@ -108,6 +116,7 @@ protected KubernetesPeonLifecycle(
this.taskLogs = taskLogs;
this.mapper = mapper;
this.stateListener = stateListener;
this.listeners = listeners;
}

/**
Expand Down Expand Up @@ -178,7 +187,11 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
{
try {
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);

TaskRunnerUtils.notifyLocationChanged(
listeners,
task.getId(),
getTaskLocation()
);
JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
timeout,
Expand All @@ -190,9 +203,10 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
finally {
try {
saveLogs();
shutdown();
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
}
catch (Exception e) {
log.warn(e, "Log processing failed for task [%s]", taskId);
log.warn(e, "Cleanup failed for task [%s]", taskId);
}

stopTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs;

import java.util.List;
import java.util.concurrent.Executor;

public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory
{
private final KubernetesPeonClient client;
Expand All @@ -42,14 +47,15 @@ public KubernetesPeonLifecycleFactory(
}

@Override
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List<Pair<TaskRunnerListener, Executor>> listeners)
{
return new KubernetesPeonLifecycle(
task,
client,
taskLogs,
mapper,
stateListener
stateListener,
listeners
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,20 @@ public Optional<InputStream> streamTaskLog(String taskid, long offset)
public ListenableFuture<TaskStatus> run(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> {
exec.submit(() -> runTask(task));
return new KubernetesWorkItem(task);
}).getResult();
}
}

protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> {
exec.submit(() -> joinTask(task));
return new KubernetesWorkItem(task);
}).getResult();
}
}

Expand All @@ -172,10 +176,12 @@ private TaskStatus joinTask(Task task)
@VisibleForTesting
protected TaskStatus doTask(Task task, boolean run)
{
TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution never started");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit weird to initialize status with a failure one, don't think we need it.

try {
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics
this::emitTaskStateMetrics,
listeners
);

synchronized (tasks) {
Expand All @@ -188,7 +194,6 @@ protected TaskStatus doTask(Task task, boolean run)
workItem.setKubernetesPeonLifecycle(peonLifecycle);
}

TaskStatus taskStatus;
if (run) {
taskStatus = peonLifecycle.run(
adapter.fromTask(task),
Expand All @@ -201,15 +206,17 @@ protected TaskStatus doTask(Task task, boolean run)
config.getTaskTimeout().toStandardDuration().getMillis()
);
}

updateStatus(task, taskStatus);

return taskStatus;
}
catch (Exception e) {
log.error(e, "Task [%s] execution caught an exception", task.getId());
taskStatus = TaskStatus.failure(task.getId(), "Could not start task execution");
throw new RuntimeException(e);
}
finally {
updateStatus(task, taskStatus);
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), TaskLocation.unknown());
}
}

@VisibleForTesting
Expand Down Expand Up @@ -242,13 +249,13 @@ protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String
@Override
public void updateStatus(Task task, TaskStatus status)
{
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}
KubernetesWorkItem workItem = tasks.get(task.getId());
if (workItem != null && !workItem.getResult().isDone() && status.isComplete()) {
workItem.setResult(status);
}

@Override
public void updateLocation(Task task, TaskLocation location)
{
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), location);
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
// Notify listeners even if the result is set to handle the shutdown case.
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}

@Override
Expand Down Expand Up @@ -417,6 +424,16 @@ public void registerListener(TaskRunnerListener listener, Executor executor)
final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor);
log.debug("Registered listener [%s]", listener.getListenerId());
listeners.add(listenerPair);

for (Map.Entry<String, KubernetesWorkItem> entry : tasks.entrySet()) {
if (entry.getValue().isRunning()) {
TaskRunnerUtils.notifyLocationChanged(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableList.of(listenerPair),
entry.getKey(),
entry.getValue().getLocation()
);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class KubernetesTaskRunnerConfig
@JsonProperty
@NotNull
// how long to wait for the jobs to be cleaned up.
private Period taskCleanupDelay = new Period("P2D");
private Period taskCleanupDelay = new Period("PT1H");

@JsonProperty
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -36,9 +36,17 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
private final Task task;
private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;

public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture)
private final SettableFuture<TaskStatus> result;

public KubernetesWorkItem(Task task)
{
this(task, SettableFuture.create());
}

public KubernetesWorkItem(Task task, SettableFuture<TaskStatus> result)
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
{
super(task.getId(), statusFuture);
super(task.getId(), result);
this.result = result;
this.task = task;
}

Expand All @@ -51,7 +59,7 @@ protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle k
protected synchronized void shutdown()
{

if (this.kubernetesPeonLifecycle != null) {
if (this.kubernetesPeonLifecycle != null && !result.isDone()) {
this.kubernetesPeonLifecycle.startWatchingLogs();
this.kubernetesPeonLifecycle.shutdown();
}
Expand Down Expand Up @@ -119,4 +127,9 @@ public Task getTask()
{
return task;
}

public void setResult(TaskStatus status)
{
result.set(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
package org.apache.druid.k8s.overlord;

import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.Pair;

import java.util.List;
import java.util.concurrent.Executor;

public interface PeonLifecycleFactory
{
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener);
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List<Pair<TaskRunnerListener, Executor>> listeners);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,17 @@ public class JobResponse
private static final EmittingLogger LOGGER = new EmittingLogger(JobResponse.class);

private final Job job;
private final PeonPhase phase;

public JobResponse(@Nullable Job job, PeonPhase phase)
public JobResponse(@Nullable Job job)
{
this.job = job;
this.phase = phase;
}

public Job getJob()
{
return job;
}

public PeonPhase getPhase()
{
return phase;
}

public long getJobDuration()
{
long duration = -1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, Time
);
if (job == null) {
log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", taskId);
return new JobResponse(null, PeonPhase.FAILED);
return new JobResponse(null);
}
if (job.getStatus().getSucceeded() != null) {
return new JobResponse(job, PeonPhase.SUCCEEDED);
return new JobResponse(job);
}
log.warn("Task %s failed with status %s", taskId, job.getStatus());
return new JobResponse(job, PeonPhase.FAILED);
return new JobResponse(job);
});
}

Expand Down

This file was deleted.

Loading
Loading