Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When restoring tasks from kubernetes, compare to tasks in task storage to get the Task object. #14802

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
Expand All @@ -48,6 +49,7 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
Expand Down Expand Up @@ -106,6 +108,7 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner
private final HttpClient httpClient;
private final PeonLifecycleFactory peonLifecycleFactory;
private final ServiceEmitter emitter;
private final TaskStorage taskStorage;


public KubernetesTaskRunner(
Expand All @@ -114,7 +117,8 @@ public KubernetesTaskRunner(
KubernetesPeonClient client,
HttpClient httpClient,
PeonLifecycleFactory peonLifecycleFactory,
ServiceEmitter emitter
ServiceEmitter emitter,
TaskStorage taskStorage
)
{
this.adapter = adapter;
Expand All @@ -127,6 +131,7 @@ public KubernetesTaskRunner(
Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
);
this.emitter = emitter;
this.taskStorage = taskStorage;
}

@Override
Expand Down Expand Up @@ -312,10 +317,17 @@ public Optional<InputStream> streamTaskReports(String taskid) throws IOException
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
List<Pair<Task, ListenableFuture<TaskStatus>>> restoredTasks = new ArrayList<>();
Map<String, Task> tasksFromStorage = taskStorage.getActiveTasks().stream().collect(Collectors.toMap(Task::getId, task -> task));
for (Job job : client.getPeonJobs()) {
try {
Task task = adapter.toTask(job);
restoredTasks.add(Pair.of(task, joinAsync(task)));
String taskId = adapter.getTaskId(job);
Task task = tasksFromStorage.get(taskId);
if (task == null) {
log.warn("Found K8s job running task id %s that was not in taskStorage during restore, deleting it.", taskId);
client.deletePeonJob(new K8sTaskId(taskId));
continue;
}
restoredTasks.add(Pair.of(tasksFromStorage.get(taskId), joinAsync(task)));
}
catch (IOException e) {
log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.overlord.TaskRunnerFactory;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory<Kubernetes
private final DruidKubernetesClient druidKubernetesClient;
private final ServiceEmitter emitter;
private KubernetesTaskRunner runner;
private TaskStorage taskStorage;


@Inject
Expand All @@ -70,7 +72,8 @@ public KubernetesTaskRunnerFactory(
TaskConfig taskConfig,
Properties properties,
DruidKubernetesClient druidKubernetesClient,
ServiceEmitter emitter
ServiceEmitter emitter,
TaskStorage taskStorage
)
{
this.smileMapper = smileMapper;
Expand All @@ -83,6 +86,7 @@ public KubernetesTaskRunnerFactory(
this.properties = properties;
this.druidKubernetesClient = druidKubernetesClient;
this.emitter = emitter;
this.taskStorage = taskStorage;
}

@Override
Expand All @@ -101,7 +105,8 @@ public KubernetesTaskRunner build()
peonClient,
httpClient,
new KubernetesPeonLifecycleFactory(peonClient, taskLogs, smileMapper),
emitter
emitter,
taskStorage
);
return runner;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
Expand All @@ -48,9 +47,9 @@
import org.apache.druid.indexing.overlord.QuotableWhiteSpaceSplitter;
import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.Base64Compression;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesClientApi;
Expand Down Expand Up @@ -124,17 +123,17 @@ public Job fromTask(Task task) throws IOException
}

@Override
public Task toTask(Job from) throws IOException
public String getTaskId(Job from) throws IOException
{
PodSpec podSpec = from.getSpec().getTemplate().getSpec();
massageSpec(podSpec, "main");
List<EnvVar> envVars = podSpec.getContainers().get(0).getEnv();
Optional<EnvVar> taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst();
String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null);
if (contents == null) {
throw new IOException("No TASK_JSON environment variable found in pod: " + from.getMetadata().getName());
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
}
return mapper.readValue(Base64Compression.decompressBase64(contents), Task.class);
String taskId = annotations.get(DruidK8sConstants.TASK_ID);
if (taskId == null) {
throw new IOE("No task.id annotation found on pod spec for job [%s]", from.getMetadata().getName());
}
return taskId;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
Expand Down Expand Up @@ -148,28 +147,18 @@ public Job fromTask(Task task) throws IOException
.build();
}

/**
* Transform a {@link Pod} to a {@link Task}
*
* 1. Find task annotation on the pod
* 2. Base 64 decode and decompress task, read into {@link Task}
*
* @param from
* @return {@link Task}
* @throws IOException
*/
@Override
public Task toTask(Job from) throws IOException
public String getTaskId(Job from) throws IOException
{
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
}
String task = annotations.get(DruidK8sConstants.TASK);
if (task == null) {
throw new IOE("No task annotation found on pod spec for job [%s]", from.getMetadata().getName());
String taskId = annotations.get(DruidK8sConstants.TASK_ID);
if (taskId == null) {
throw new IOE("No task.id annotation found on pod spec for job [%s]", from.getMetadata().getName());
}
return mapper.readValue(Base64Compression.decompressBase64(task), Task.class);
return taskId;
}

private HashMap<String, PodTemplate> initializePodTemplates(Properties properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public interface TaskAdapter

Job fromTask(Task task) throws IOException;

Task toTask(Job from) throws IOException;
String getTaskId(Job from) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems implementations for this method are the same in diff subclasses, can make it default method implementation.


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
Expand Down Expand Up @@ -54,6 +55,8 @@ public class KubernetesTaskRunnerFactoryTest
private DruidKubernetesClient druidKubernetesClient;
@Mock private ServiceEmitter emitter;

@Mock private TaskStorage taskStorage;

@Before
public void setup()
{
Expand Down Expand Up @@ -90,7 +93,8 @@ public void test_get_returnsSameKuberentesTaskRunner_asBuild()
taskConfig,
properties,
druidKubernetesClient,
emitter
emitter,
taskStorage
);

KubernetesTaskRunner expectedRunner = factory.build();
Expand All @@ -112,7 +116,8 @@ public void test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSing
taskConfig,
properties,
druidKubernetesClient,
emitter
emitter,
taskStorage
);

KubernetesTaskRunner runner = factory.build();
Expand All @@ -139,7 +144,8 @@ public void test_build_withSidecarSupport_returnsKubernetesTaskRunnerWithMultiCo
taskConfig,
properties,
druidKubernetesClient,
emitter
emitter,
taskStorage
);

KubernetesTaskRunner runner = factory.build();
Expand All @@ -164,7 +170,8 @@ public void test_build_withSingleContainerAdapterType_returnsKubernetesTaskRunne
taskConfig,
props,
druidKubernetesClient,
emitter
emitter,
taskStorage
);

KubernetesTaskRunner runner = factory.build();
Expand Down Expand Up @@ -194,7 +201,8 @@ public void test_build_withSingleContainerAdapterTypeAndSidecarSupport_throwsIAE
taskConfig,
props,
druidKubernetesClient,
emitter
emitter,
taskStorage
);

Assert.assertThrows(
Expand Down Expand Up @@ -225,7 +233,8 @@ public void test_build_withMultiContainerAdapterType_returnsKubernetesTaskRunner
taskConfig,
props,
druidKubernetesClient,
emitter
emitter,
taskStorage
);

KubernetesTaskRunner runner = factory.build();
Expand All @@ -250,7 +259,8 @@ public void test_build_withMultiContainerAdapterTypeAndSidecarSupport_returnsKub
taskConfig,
props,
druidKubernetesClient,
emitter
emitter,
taskStorage
);

KubernetesTaskRunner runner = factory.build();
Expand Down Expand Up @@ -278,7 +288,8 @@ public void test_build_withPodTemplateAdapterType_returnsKubernetesTaskRunnerWit
taskConfig,
props,
druidKubernetesClient,
emitter
emitter,
taskStorage
);

KubernetesTaskRunner runner = factory.build();
Expand Down
Loading