Skip to content

Commit

Permalink
Fix exposing inconsistency in job status outside of persistence API
Browse files Browse the repository at this point in the history
Within job launch logic there was a helper method which would query the job status and based on the returned value proceed with some logic to either update it or fall back to other logic. This works ok if all requests to the persistence service implementation go to a single cosistent backend. If, however, read only queries go to a read replica which may have lag or some other implementation entirely this breaks down without the service actually knowing why or how.

Moving the logic for this behind the persistence API and letting the launch service only act the returned job status from the source of truth api should fix this problem.
  • Loading branch information
tgianos committed Feb 9, 2022
1 parent 3a23b13 commit ea261c8
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -422,20 +422,15 @@ void canUpdateJobStatus() throws GenieCheckedException, IOException {

Assertions.assertThat(jobEntity.getStatus()).isEqualTo(JobStatus.CLAIMED.name());

// status won't match so it will throw exception
// status won't match so it won't update
Assertions
.assertThatExceptionOfType(GenieInvalidStatusException.class)
.isThrownBy(
() -> this.service.updateJobStatus(
jobId,
JobStatus.RUNNING,
JobStatus.FAILED,
null
)
);
.assertThat(this.service.updateJobStatus(jobId, JobStatus.RUNNING, JobStatus.FAILED, null))
.isEqualTo(JobStatus.CLAIMED);

final String initStatusMessage = "Job is initializing";
this.service.updateJobStatus(jobId, JobStatus.CLAIMED, JobStatus.INIT, initStatusMessage);
Assertions
.assertThat(this.service.updateJobStatus(jobId, JobStatus.CLAIMED, JobStatus.INIT, initStatusMessage))
.isEqualTo(JobStatus.INIT);

jobEntity = this.jobRepository
.findByUniqueId(jobId)
Expand All @@ -448,7 +443,9 @@ void canUpdateJobStatus() throws GenieCheckedException, IOException {
Assertions.assertThat(jobEntity.getFinished()).isNotPresent();

final String runningStatusMessage = "Job is running";
this.service.updateJobStatus(jobId, JobStatus.INIT, JobStatus.RUNNING, runningStatusMessage);
Assertions
.assertThat(this.service.updateJobStatus(jobId, JobStatus.INIT, JobStatus.RUNNING, runningStatusMessage))
.isEqualTo(JobStatus.RUNNING);

jobEntity = this.jobRepository
.findByUniqueId(jobId)
Expand All @@ -459,15 +456,17 @@ void canUpdateJobStatus() throws GenieCheckedException, IOException {
Assertions.assertThat(jobEntity.getStarted()).isPresent();
Assertions.assertThat(jobEntity.getFinished()).isNotPresent();

final String successStatusMessage = "Job completed successfully";
this.service.updateJobStatus(jobId, JobStatus.RUNNING, JobStatus.SUCCEEDED, successStatusMessage);
final String successMessage = "Job completed successfully";
Assertions
.assertThat(this.service.updateJobStatus(jobId, JobStatus.RUNNING, JobStatus.SUCCEEDED, successMessage))
.isEqualTo(JobStatus.SUCCEEDED);

jobEntity = this.jobRepository
.findByUniqueId(jobId)
.orElseThrow(IllegalArgumentException::new);

Assertions.assertThat(jobEntity.getStatus()).isEqualTo(JobStatus.SUCCEEDED.name());
Assertions.assertThat(jobEntity.getStatusMsg()).isPresent().contains(successStatusMessage);
Assertions.assertThat(jobEntity.getStatusMsg()).isPresent().contains(successMessage);
Assertions.assertThat(jobEntity.getStarted()).isPresent();
Assertions.assertThat(jobEntity.getFinished()).isPresent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,23 +695,22 @@ void claimJob(
* of the job matches {@code newStatus}. Optionally a status message can be provided to provide more details to
* users. If the {@code newStatus} is {@link JobStatus#RUNNING} the start time will be set. If the {@code newStatus}
* is a member of {@link JobStatus#getFinishedStatuses()} and the job had a started time set the finished time of
* the job will be set.
* the job will be set. If the {@literal currentStatus} is different from what the source of truth thinks this
* function will skip the update and just return the current source of truth value.
*
* @param id The id of the job to update status for. Must exist in the system.
* @param currentStatus The status the caller to this API thinks the job currently has
* @param newStatus The new status the caller would like to update the status to
* @param newStatusMessage An optional status message to associate with this change
* @throws NotFoundException if no job with the given {@code id} exists
* @throws GenieInvalidStatusException if the current status of the job identified by {@code id} in the system
* doesn't match the supplied {@code currentStatus}.
* Also if the {@code currentStatus} equals the {@code newStatus}.
* @return The job status in the source of truth
* @throws NotFoundException if no job with the given {@code id} exists
*/
void updateJobStatus(
JobStatus updateJobStatus(
@NotBlank String id,
@NotNull JobStatus currentStatus,
@NotNull JobStatus newStatus,
@Nullable String newStatusMessage
) throws NotFoundException, GenieInvalidStatusException;
) throws NotFoundException;

/**
* Update the status and status message of the job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1765,12 +1765,12 @@ public void claimJob(
* {@inheritDoc}
*/
@Override
public void updateJobStatus(
public JobStatus updateJobStatus(
@NotBlank final String id,
@NotNull final JobStatus currentStatus,
@NotNull final JobStatus newStatus,
@Nullable final String newStatusMessage
) throws NotFoundException, GenieInvalidStatusException {
) throws NotFoundException {
log.debug(
"[updateJobStatus] Requested to change the status of job {} from {} to {} with message {}",
id,
Expand All @@ -1779,41 +1779,61 @@ public void updateJobStatus(
newStatusMessage
);
if (currentStatus == newStatus) {
throw new GenieInvalidStatusException(
"Can't update the status of job " + id + " because both current and new status are " + currentStatus
log.debug(
"[updateJobStatus] Requested new status for {} is same as current status: {}. Skipping update.",
id,
currentStatus
);
return newStatus;
}

final JobEntity jobEntity = this.getJobEntity(id);

final JobStatus actualCurrentStatus = DtoConverters.toV4JobStatus(jobEntity.getStatus());
if (actualCurrentStatus != currentStatus) {
throw new GenieInvalidStatusException(
"Job "
+ id
+ " current status is "
+ actualCurrentStatus
+ " but API caller expected it to be "
+ currentStatus
+ ". Unable to update status due to inconsistent state."
log.warn(
"[updateJobStatus] Job {} actual status {} differs from expected status {}. Skipping update.",
id,
actualCurrentStatus,
currentStatus
);
return actualCurrentStatus;
}

// TODO: Should we throw an exception if the job is already in a terminal state and someone is trying to
// further update it? In the private method below used in Genie 3 it's just swallowed and is a no-op

// TODO: Should we prevent updating status for statuses already covered by "reserveJobId" and
// "saveResolvedJob"?

this.updateJobStatus(jobEntity, newStatus, newStatusMessage);
// Only change the status if the entity isn't already in a terminal state
if (actualCurrentStatus.isActive()) {
jobEntity.setStatus(newStatus.name());
jobEntity.setStatusMsg(StringUtils.truncate(newStatusMessage, MAX_STATUS_MESSAGE_LENGTH));

log.debug(
"[updateJobStatus] Changed the status of job {} from {} to {} with message {}",
id,
currentStatus,
newStatus,
newStatusMessage
);
if (newStatus.equals(JobStatus.RUNNING)) {
// Status being changed to running so set start date.
jobEntity.setStarted(Instant.now());
} else if (jobEntity.getStarted().isPresent() && newStatus.isFinished()) {
// Since start date is set the job was running previously and now has finished
// with status killed, failed or succeeded. So we set the job finish time.
jobEntity.setFinished(Instant.now());
}

log.debug(
"[updateJobStatus] Changed the status of job {} from {} to {} with message {}",
id,
currentStatus,
newStatus,
newStatusMessage
);

return newStatus;
} else {
log.warn(
"[updateJobStatus] Job status for {} is already terminal state {}. Skipping update.",
id,
actualCurrentStatus
);
return actualCurrentStatus;
}
}

/**
Expand Down Expand Up @@ -2615,28 +2635,6 @@ private void updateClusterCriteria(final CommandEntity commandEntity, final List
);
}

private void updateJobStatus(
final JobEntity jobEntity,
final JobStatus newStatus,
@Nullable final String statusMsg
) {
final JobStatus currentStatus = DtoConverters.toV4JobStatus(jobEntity.getStatus());
// Only change the status if the entity isn't already in a terminal state
if (currentStatus.isActive()) {
jobEntity.setStatus(newStatus.name());
jobEntity.setStatusMsg(StringUtils.truncate(statusMsg, MAX_STATUS_MESSAGE_LENGTH));

if (newStatus.equals(JobStatus.RUNNING)) {
// Status being changed to running so set start date.
jobEntity.setStarted(Instant.now());
} else if (jobEntity.getStarted().isPresent() && newStatus.isFinished()) {
// Since start date is set the job was running previously and now has finished
// with status killed, failed or succeeded. So we set the job finish time.
jobEntity.setFinished(Instant.now());
}
}
}

private void setJobMetadataFields(
final JobEntity jobEntity,
final JobMetadata jobMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.netflix.genie.common.external.dtos.v4.ArchiveStatus;
import com.netflix.genie.common.external.dtos.v4.JobStatus;
import com.netflix.genie.common.internal.exceptions.checked.GenieJobResolutionException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieInvalidStatusException;
import com.netflix.genie.common.internal.tracing.brave.BraveTracingComponents;
import com.netflix.genie.web.agent.launchers.AgentLauncher;
import com.netflix.genie.web.data.services.DataServices;
Expand Down Expand Up @@ -149,7 +148,10 @@ public String launchJob(

MetricsUtils.addFailureTagsWithException(tags, t);
this.persistenceService.updateJobArchiveStatus(jobId, ArchiveStatus.NO_FILES);
if (this.updateJobStatus(jobId, JobStatus.FAILED, message, INITIAL_ATTEMPT) != JobStatus.FAILED) {
if (
this.updateJobStatus(jobId, JobStatus.RESERVED, JobStatus.FAILED, message, INITIAL_ATTEMPT)
!= JobStatus.FAILED
) {
log.error("Updating status to failed didn't succeed");
}
throw t; // Caught below for metrics gathering
Expand All @@ -161,6 +163,7 @@ public String launchJob(
try {
final JobStatus updatedStatus = this.updateJobStatus(
jobId,
JobStatus.RESOLVED,
JobStatus.ACCEPTED,
ACCEPTED_MESSAGE,
INITIAL_ATTEMPT
Expand All @@ -186,7 +189,7 @@ public String launchJob(
launcherExt = launcher.launchAgent(resolvedJob, requestedLauncherExt);
} catch (final AgentLaunchException e) {
this.persistenceService.updateJobArchiveStatus(jobId, ArchiveStatus.NO_FILES);
this.updateJobStatus(jobId, JobStatus.FAILED, e.getMessage(), INITIAL_ATTEMPT);
this.updateJobStatus(jobId, JobStatus.ACCEPTED, JobStatus.FAILED, e.getMessage(), INITIAL_ATTEMPT);
// TODO: How will we get the ID back to the user? Should we add it to an exception? We don't get
// We don't get the ID until after saveJobSubmission so if that fails we'd still return nothing
// Probably need multiple exceptions to be thrown from this API (if we go with checked)
Expand Down Expand Up @@ -276,12 +279,18 @@ private AgentLauncher selectLauncher(
*/
private JobStatus updateJobStatus(
final String jobId,
final JobStatus expectedStatus,
final JobStatus desiredStatus,
final String desiredStatusMessage,
final int attemptNumber
) throws NotFoundException {
final int nextAttemptNumber = attemptNumber + 1;
final JobStatus currentStatus = this.persistenceService.getJobStatus(jobId);
final JobStatus currentStatus = this.persistenceService.updateJobStatus(
jobId,
expectedStatus,
desiredStatus,
desiredStatusMessage
);
if (currentStatus.isFinished()) {
log.info(
"Won't change job status of {} from {} to {} desired status as {} is already a final status",
Expand All @@ -291,32 +300,36 @@ private JobStatus updateJobStatus(
currentStatus
);
return currentStatus;
} else if (currentStatus == desiredStatus) {
log.debug("Successfully updated status of {} from {} to {}", jobId, expectedStatus, desiredStatus);
return currentStatus;
} else {
try {
this.persistenceService.updateJobStatus(jobId, currentStatus, desiredStatus, desiredStatusMessage);
return desiredStatus;
} catch (final GenieInvalidStatusException e) {
log.error(
"Job {} status changed from expected {}. Couldn't update to {}. Attempt {}",
log.error(
"Job {} status changed from expected {} to {}. Couldn't update to {}. Attempt {}",
jobId,
expectedStatus,
currentStatus,
desiredStatus,
nextAttemptNumber
);
// Recursive call that should break out if update is successful or job is now in a final state
// or if attempts reach the max attempts
if (nextAttemptNumber < MAX_STATUS_UPDATE_ATTEMPTS) {
return this.updateJobStatus(
jobId,
currentStatus,
desiredStatus,
desiredStatusMessage,
nextAttemptNumber
);
// Recursive call that should break out if update is successful or job is now in a final state
// or if attempts reach the max attempts
if (nextAttemptNumber < MAX_STATUS_UPDATE_ATTEMPTS) {
return this.updateJobStatus(jobId, desiredStatus, desiredStatusMessage, attemptNumber + 1);
} else {
// breakout condition, stop attempting to update DB
log.error(
"Out of attempts to update job {} status to {}. Unable to complete status update",
jobId,
desiredStatus,
e
);
return currentStatus;
}
} else {
// breakout condition, stop attempting to update DB
log.error(
"Out of attempts to update job {} status to {}. Unable to complete status update",
jobId,
desiredStatus
);
return currentStatus;
}
}
}
Expand Down
Loading

0 comments on commit ea261c8

Please sign in to comment.