Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix k8s task runner failure reporting #15311

Merged
merged 4 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,23 @@ public class UpdateStatusAction implements TaskAction<Void>
{
@JsonIgnore
private final String status;
private final TaskStatus statusFull;
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved

public UpdateStatusAction(
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
String status
)
{
this(status, null);
}

@JsonCreator
public UpdateStatusAction(
@JsonProperty("status") String status
@JsonProperty("status") String status,
@JsonProperty("statusFull") TaskStatus statusFull
)
{
this.status = status;
this.statusFull = statusFull;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If status is deprecated, and callers are expected to use statusFull can we add validation in the constructor to make sure only 1 field is set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd rather not add the extra validation since its principally a json constructor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine.

My thinking was that in the perform method either status or statusFull is used. So if a caller calls this method with both set, it's likely a mistake, and they won't know about it unless they closely inspect the results of the perform method.

}


Expand All @@ -50,6 +60,12 @@ public String getStatus()
return status;
}

@JsonProperty
public TaskStatus getStatusFull()
{
return statusFull;
}

@Override
public TypeReference<Void> getReturnTypeReference()
{
Expand All @@ -63,9 +79,8 @@ public Void perform(Task task, TaskActionToolbox toolbox)
{
Optional<TaskRunner> taskRunner = toolbox.getTaskRunner();
if (taskRunner.isPresent()) {
TaskStatus result = "successful".equals(status)
? TaskStatus.success(task.getId())
: TaskStatus.failure(task.getId(), "Error with task");
// Fall back to checking status instead of statusFull for backwards compatibility
TaskStatus result = statusFull != null ? statusFull : "successful".equals(status) ? TaskStatus.success(task.getId()) : TaskStatus.failure(task.getId(), "Error with task");
taskRunner.get().updateStatus(task, result);
}
return null;
Expand All @@ -82,6 +97,7 @@ public String toString()
{
return "UpdateStatusAction{" +
"status=" + status +
"statusFull=" + statusFull +
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
'}';
}

Expand All @@ -95,12 +111,12 @@ public boolean equals(Object o)
return false;
}
UpdateStatusAction that = (UpdateStatusAction) o;
return Objects.equals(status, that.status);
return Objects.equals(status, that.status) && Objects.equals(statusFull, that.statusFull);
}

@Override
public int hashCode()
{
return Objects.hash(status);
return Objects.hash(status, statusFull);
}
georgew5656 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,13 @@ public String setup(TaskToolbox toolbox) throws Exception
@Override
public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
{
TaskStatus taskStatus = TaskStatus.running(getId());
TaskStatus taskStatus = null;
try {
cleanupCompletionLatch = new CountDownLatch(1);
String errorMessage = setup(taskToolbox);
if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
return TaskStatus.failure(getId(), errorMessage);
taskStatus = TaskStatus.failure(getId(), errorMessage);
return taskStatus;
}
taskStatus = runTask(taskToolbox);
return taskStatus;
Expand All @@ -186,7 +187,7 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;

@Override
public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although no harm to annotate nullable here, I think taskStatus won't be possible null as it will be set to a value in all the code path.

{
// clear any interrupted status to ensure subsequent cleanup proceeds without interruption.
Thread.interrupted();
Expand All @@ -196,11 +197,11 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
return;
}

TaskStatus taskStatusToReport = taskStatus == null
? TaskStatus.failure(id, "Task failed to run")
: taskStatus;
// report back to the overlord
UpdateStatusAction status = new UpdateStatusAction("successful");
if (taskStatus.isFailure()) {
status = new UpdateStatusAction("failure");
}
UpdateStatusAction status = new UpdateStatusAction("", taskStatus);
toolbox.getTaskActionClient().submit(status);

if (reportsFile != null && reportsFile.exists()) {
Expand All @@ -211,7 +212,7 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
}

if (statusFile != null) {
toolbox.getJsonMapper().writeValue(statusFile, taskStatus);
toolbox.getJsonMapper().writeValue(statusFile, taskStatusToReport);
toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile);
Files.deleteIfExists(statusFile.toPath());
log.debug("Pushed task status");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ public void testFailureScenario()
verify(runner, times(1)).updateStatus(eq(task), eq(TaskStatus.failure(task.getId(), "Error with task")));
}

@Test
public void testTaskStatusFull()
{
Task task = NoopTask.create();
TaskActionToolbox toolbox = mock(TaskActionToolbox.class);
TaskRunner runner = mock(TaskRunner.class);
when(toolbox.getTaskRunner()).thenReturn(Optional.of(runner));
TaskStatus taskStatus = TaskStatus.failure(task.getId(), "custom error message");
UpdateStatusAction action = new UpdateStatusAction("failure", taskStatus);
action.perform(task, toolbox);
verify(runner, times(1)).updateStatus(eq(task), eq(taskStatus));
}

@Test
public void testNoTaskRunner()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,94 @@ public void testTaskFailureWithoutExceptionGetsReportedCorrectly() throws Except
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);

TaskStatus taskStatus = TaskStatus.failure("myId", "failed");
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
{
@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
return TaskStatus.failure("myId", "failed");
return taskStatus;
}
};
task.run(toolbox);
UpdateStatusAction action = new UpdateStatusAction("failure");
UpdateStatusAction action = new UpdateStatusAction("", taskStatus);
verify(taskActionClient).submit(eq(action));
}

@Test
public void testNullStackStatusGetsReportedCorrectly() throws Exception
{
TaskToolbox toolbox = mock(TaskToolbox.class);
when(toolbox.getAttemptId()).thenReturn("1");

DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true);
when(toolbox.getTaskExecutorNode()).thenReturn(node);

TaskLogPusher pusher = mock(TaskLogPusher.class);
when(toolbox.getTaskLogPusher()).thenReturn(pusher);

TaskConfig config = mock(TaskConfig.class);
when(config.isEncapsulatedTask()).thenReturn(true);
File folder = temporaryFolder.newFolder();
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
when(toolbox.getConfig()).thenReturn(config);
when(toolbox.getJsonMapper()).thenReturn(objectMapper);

TaskActionClient taskActionClient = mock(TaskActionClient.class);
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
{
@Nullable
@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
// Simulate the scenario where taskStatus is never set and cleanUp is called with null.
return null;
}
};
task.run(toolbox);
UpdateStatusAction action = new UpdateStatusAction("", TaskStatus.failure(task.getId(), "Task failed to run"));
verify(taskActionClient).submit(eq(action));
}

@Test
public void testSetupFailsGetsReportedCorrectly() throws Exception
{
TaskToolbox toolbox = mock(TaskToolbox.class);
when(toolbox.getAttemptId()).thenReturn("1");

DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true);
when(toolbox.getTaskExecutorNode()).thenReturn(node);

TaskLogPusher pusher = mock(TaskLogPusher.class);
when(toolbox.getTaskLogPusher()).thenReturn(pusher);

TaskConfig config = mock(TaskConfig.class);
when(config.isEncapsulatedTask()).thenReturn(true);
File folder = temporaryFolder.newFolder();
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
when(toolbox.getConfig()).thenReturn(config);
when(toolbox.getJsonMapper()).thenReturn(objectMapper);

TaskActionClient taskActionClient = mock(TaskActionClient.class);
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
{
@Nullable
@Override
public String setup(TaskToolbox toolbox)
{
return "setup error";
}
};
task.run(toolbox);
UpdateStatusAction action = new UpdateStatusAction("", TaskStatus.failure(task.getId(), "setup error"));
verify(taskActionClient).submit(eq(action));
}


@Test
public void testBatchIOConfigAppend()
{
Expand Down
Loading