Skip to content

Commit

Permalink
delete jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
georgew5656 committed Aug 11, 2023
1 parent ea6aef0 commit a966ebf
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
Expand Down Expand Up @@ -322,7 +323,8 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
String taskId = adapter.getTaskId(job);
Task task = tasksFromStorage.get(taskId);
if (task == null) {
log.warn("Found K8s job running task id %s that was not in taskStorage during restore, ignoring it.", taskId);
log.warn("Found K8s job running task id %s that was not in taskStorage during restore, deleting it.", taskId);
client.deletePeonJob(new K8sTaskId(taskId));
continue;
}
restoredTasks.add(Pair.of(tasksFromStorage.get(taskId), joinAsync(task)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.easymock.EasyMock;
Expand Down Expand Up @@ -366,6 +367,46 @@ protected ListenableFuture<TaskStatus> joinAsync(Task task)
Assert.assertEquals(0, tasks.size());
}

@Test
public void test_restore_missingFromTaskStorage_cleanupJob() throws IOException
{
KubernetesTaskRunner runner = new KubernetesTaskRunner(
taskAdapter,
config,
peonClient,
httpClient,
new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
emitter,
taskStorage
) {
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
return new KubernetesWorkItem(task, null).getResult();
}
};

Job job = new JobBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.build();

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.getTaskId(job)).andReturn(ID);
EasyMock.expect(peonClient.deletePeonJob(new K8sTaskId(ID))).andReturn(true);
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of());

replayAll();

List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = runner.restore();

verifyAll();

Assert.assertNotNull(tasks);
Assert.assertEquals(0, tasks.size());
}

@Test
public void test_getTotalTaskSlotCount()
{
Expand Down

0 comments on commit a966ebf

Please sign in to comment.