Skip to content

Commit

Permalink
Restore tasks when lifecycle start (#14909)
Browse files Browse the repository at this point in the history
* K8s tasks restore should be from lifecycle start

* add test

* add more tests

* fix test

* wait tasks restore finish when start

* fix style

* revert previous change and add comment
  • Loading branch information
YongGang authored Sep 22, 2023
1 parent 5cee9f6 commit be3f93e
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -60,7 +61,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -309,23 +309,25 @@ public Optional<InputStream> streamTaskReports(String taskid) throws IOException
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
List<Pair<Task, ListenableFuture<TaskStatus>>> restoredTasks = new ArrayList<>();
return ImmutableList.of();
}

@Override
@LifecycleStart
public void start()
{
log.info("Starting K8sTaskRunner...");
// Load tasks from previously running jobs and wait for their statuses to be updated asynchronously.
for (Job job : client.getPeonJobs()) {
try {
Task task = adapter.toTask(job);
restoredTasks.add(Pair.of(task, joinAsync(task)));
joinAsync(adapter.toTask(job));
}
catch (IOException e) {
log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName());
}
}
return restoredTasks;
}
log.info("Loaded %,d tasks from previous run", tasks.size());

@Override
@LifecycleStart
public void start()
{
cleanupExecutor.scheduleAtFixedRate(
() ->
client.deleteCompletedPeonJobsOlderThan(
Expand All @@ -339,7 +341,6 @@ public void start()
log.debug("Started cleanup executor for jobs older than %s...", config.getTaskCleanupDelay());
}


@Override
@LifecycleStop
public void stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.http.client.HttpClient;
Expand All @@ -56,7 +55,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -101,6 +99,89 @@ public void setup()
);
}

@Test
public void test_start_withExistingJobs() throws IOException
{
KubernetesTaskRunner runner = new KubernetesTaskRunner(
taskAdapter,
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter
)
{
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
return tasks.computeIfAbsent(
task.getId(),
k -> new KubernetesWorkItem(
task,
Futures.immediateFuture(TaskStatus.success(task.getId()))
)
).getResult();
}
};

Job job = new JobBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.build();

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);

replayAll();

runner.start();

verifyAll();

Assert.assertNotNull(runner.tasks);
Assert.assertEquals(1, runner.tasks.size());
}

@Test
public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOException
{
KubernetesTaskRunner runner = new KubernetesTaskRunner(
taskAdapter,
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter
)
{
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null))
.getResult();
}
};

Job job = new JobBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.build();

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException());

replayAll();

runner.start();

verifyAll();

Assert.assertNotNull(runner.tasks);
Assert.assertEquals(0, runner.tasks.size());
}

@Test
public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional()
{
Expand Down Expand Up @@ -263,80 +344,6 @@ public void test_shutdown_withoutExistingTask()
runner.shutdown(task.getId(), "");
}

@Test
public void test_restore_withExistingJobs() throws IOException
{
KubernetesTaskRunner runner = new KubernetesTaskRunner(
taskAdapter,
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter
) {
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
return new KubernetesWorkItem(task, null).getResult();
}
};

Job job = new JobBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.build();

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);

replayAll();

List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = runner.restore();

verifyAll();

Assert.assertNotNull(tasks);
Assert.assertEquals(1, tasks.size());
}

@Test
public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws IOException
{
KubernetesTaskRunner runner = new KubernetesTaskRunner(
taskAdapter,
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter
) {
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
return new KubernetesWorkItem(task, null).getResult();
}
};

Job job = new JobBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.build();

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException());

replayAll();

List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = runner.restore();

verifyAll();

Assert.assertNotNull(tasks);
Assert.assertEquals(0, tasks.size());
}

@Test
public void test_getTotalTaskSlotCount()
{
Expand Down

0 comments on commit be3f93e

Please sign in to comment.