Skip to content

Commit

Permalink
Finish deprecating toTask
Browse files Browse the repository at this point in the history
  • Loading branch information
georgew5656 committed Aug 10, 2023
1 parent 0b0559b commit ea6aef0
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -317,10 +316,7 @@ public Optional<InputStream> streamTaskReports(String taskid) throws IOException
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
List<Pair<Task, ListenableFuture<TaskStatus>>> restoredTasks = new ArrayList<>();
Map<String, Task> tasksFromStorage = new HashMap<>();
for (Task task: taskStorage.getActiveTasks()) {
tasksFromStorage.put(task.getId(), task);
}
Map<String, Task> tasksFromStorage = taskStorage.getActiveTasks().stream().collect(Collectors.toMap(Task::getId, task -> task));
for (Job job : client.getPeonJobs()) {
try {
String taskId = adapter.getTaskId(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
Expand All @@ -51,7 +50,6 @@
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.Base64Compression;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesClientApi;
Expand Down Expand Up @@ -125,21 +123,8 @@ public Job fromTask(Task task) throws IOException
}

@Override
public Task toTask(Job from) throws IOException
public String getTaskId(Job from) throws IOException
{
PodSpec podSpec = from.getSpec().getTemplate().getSpec();
massageSpec(podSpec, "main");
List<EnvVar> envVars = podSpec.getContainers().get(0).getEnv();
Optional<EnvVar> taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst();
String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null);
if (contents == null) {
throw new IOException("No TASK_JSON environment variable found in pod: " + from.getMetadata().getName());
}
return mapper.readValue(Base64Compression.decompressBase64(contents), Task.class);
}

@Override
public String getTaskId(Job from) throws IOException {
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
import io.fabric8.kubernetes.api.model.ObjectFieldSelector;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
Expand Down Expand Up @@ -148,32 +147,9 @@ public Job fromTask(Task task) throws IOException
.build();
}

/**
* Transform a {@link Pod} to a {@link Task}
*
* 1. Find task annotation on the pod
* 2. Base 64 decode and decompress task, read into {@link Task}
*
* @param from
* @return {@link Task}
* @throws IOException
*/
@Override
public Task toTask(Job from) throws IOException
public String getTaskId(Job from) throws IOException
{
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
}
String task = annotations.get(DruidK8sConstants.TASK);
if (task == null) {
throw new IOE("No task annotation found on pod spec for job [%s]", from.getMetadata().getName());
}
return mapper.readValue(Base64Compression.decompressBase64(task), Task.class);
}

@Override
public String getTaskId(Job from) throws IOException {
Map<String, String> annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public interface TaskAdapter

Job fromTask(Task task) throws IOException;

Task toTask(Job from) throws IOException;

String getTaskId(Job from) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ protected ListenableFuture<TaskStatus> joinAsync(Task task)
.build();

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
EasyMock.expect(taskAdapter.getTaskId(job)).andReturn(ID);
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(task));

replayAll();

Expand Down Expand Up @@ -352,7 +353,8 @@ protected ListenableFuture<TaskStatus> joinAsync(Task task)
.build();

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException());
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of());
EasyMock.expect(taskAdapter.getTaskId(job)).andThrow(new IOException());

replayAll();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception
thread.start();

// assert that the env variable is corret
Task taskFromEnvVar = adapter.toTask(job);
assertEquals(task, taskFromEnvVar);
String taskIdFromEnvVar = adapter.getTaskId(job);
assertEquals(task.getId(), taskIdFromEnvVar);

// now copy the task.json file from the pod and make sure its the same as our task.json we expected
Path downloadPath = Paths.get(tempDir.toAbsolutePath().toString(), "task.json");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ public void serializingAndDeserializingATask() throws IOException
assertEquals(2400000000L, Long.valueOf(amount));
assertTrue(StringUtils.isBlank(containerMemory.getFormat())); // no units specified we talk in bytes

Task taskFromJob = adapter.toTask(Iterables.getOnlyElement(jobList.getItems()));
assertEquals(task, taskFromJob);
String taskId = adapter.getTaskId(Iterables.getOnlyElement(jobList.getItems()));
assertEquals(task.getId(), taskId);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws IOExce
}

@Test
public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException
public void test_getTaskId_withoutAnnotations_throwsIOE() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
Expand All @@ -261,11 +261,11 @@ public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException
Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class);


Assert.assertThrows(IOE.class, () -> adapter.toTask(job));
Assert.assertThrows(IOE.class, () -> adapter.getTaskId(job));
}

@Test
public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException
public void test_getTaskId_withoutTaskAnnotation_throwsIOE() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
Expand All @@ -292,31 +292,7 @@ public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException
.endTemplate()
.endSpec()
.build();
Assert.assertThrows(IOE.class, () -> adapter.toTask(job));
}

@Test
public void test_fromTask() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);

Properties props = new Properties();
props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());

PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
taskRunnerConfig,
taskConfig,
node,
mapper,
props
);

Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class);
Task actual = adapter.toTask(job);
Task expected = NoopTask.create("id", 1);

Assertions.assertEquals(expected, actual);
Assert.assertThrows(IOE.class, () -> adapter.getTaskId(job));
}

@Test
Expand Down

0 comments on commit ea6aef0

Please sign in to comment.