Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Stop datafeeds running when their jobs are stale #37227

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public static PersistentTasksCustomMetaData.PersistentTask<?> getDatafeedTask(St
return tasks == null ? null : tasks.getTask(datafeedTaskId(datafeedId));
}

/**
* Note that the return value of this method does NOT take node relocations into account.
* Use {@link #getJobStateModifiedForReassignments} to return a value adjusted to the most
* appropriate value following relocations.
*/
public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTasksCustomMetaData.PersistentTask<?> task = getJobTask(jobId, tasks);
if (task != null) {
Expand All @@ -68,6 +73,36 @@ public static JobState getJobState(String jobId, @Nullable PersistentTasksCustom
return JobState.CLOSED;
}

public static JobState getJobStateModifiedForReassignments(String jobId, @Nullable PersistentTasksCustomMetaData tasks) {
return getJobStateModifiedForReassignments(getJobTask(jobId, tasks));
}

public static JobState getJobStateModifiedForReassignments(@Nullable PersistentTasksCustomMetaData.PersistentTask<?> task) {
if (task == null) {
// A closed job has no persistent task
return JobState.CLOSED;
}
JobTaskState jobTaskState = (JobTaskState) task.getState();
if (jobTaskState == null) {
return JobState.OPENING;
}
JobState jobState = jobTaskState.getState();
if (jobTaskState.isStatusStale(task)) {
// the job is re-locating
if (jobState == JobState.CLOSING) {
// previous executor node failed while the job was closing - it won't
// be reopened on another node, so consider it CLOSED for most purposes
return JobState.CLOSED;
}
if (jobState != JobState.FAILED) {
// previous executor node failed and current executor node didn't
// have the chance to set job status to OPENING
return JobState.OPENING;
}
}
return jobState;
}

public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) {
PersistentTasksCustomMetaData.PersistentTask<?> task = getDatafeedTask(datafeedId, tasks);
if (task != null && task.getState() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ public JobState getState() {
return state;
}

/**
* The job state stores the allocation ID at the time it was last set.
* This method compares the allocation ID in the state with the allocation
* ID in the task. If the two are different then the task has been relocated
* to a different node after the last time the state was set. This in turn
* means that the state is not necessarily correct. For example, a job that
* has a state of OPENED but is stale must be considered to be OPENING, because
* it won't yet have a corresponding autodetect process.
* @param task The job task to check.
* @return Has the task been relocated to another node and not had its status set since then?
*/
public boolean isStatusStale(PersistentTask<?> task) {
return allocationId != task.getAllocationId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,31 +225,13 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> assignedTasks = persistentTasks.findTasks(
MlTasks.JOB_TASK_NAME, task -> node.getId().equals(task.getExecutorNode()));
for (PersistentTasksCustomMetaData.PersistentTask<?> assignedTask : assignedTasks) {
JobTaskState jobTaskState = (JobTaskState) assignedTask.getState();
JobState jobState;
if (jobTaskState == null) {
// executor node didn't have the chance to set job status to OPENING
++numberOfAllocatingJobs;
jobState = JobState.OPENING;
} else {
jobState = jobTaskState.getState();
if (jobTaskState.isStatusStale(assignedTask)) {
// the job is re-locating
if (jobState == JobState.CLOSING) {
// previous executor node failed while the job was closing - it won't
// be reopened, so consider it CLOSED for resource usage purposes
jobState = JobState.CLOSED;
} else if (jobState != JobState.FAILED) {
// previous executor node failed and current executor node didn't
// have the chance to set job status to OPENING
++numberOfAllocatingJobs;
jobState = JobState.OPENING;
}
}
}
JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask);
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
// Don't count CLOSED or FAILED jobs, as they don't consume native memory
++numberOfAssignedJobs;
if (jobState == JobState.OPENING) {
++numberOfAllocatingJobs;
}
OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams();
Long jobMemoryRequirement = memoryTracker.getJobMemoryRequirement(params.getJobId());
if (jobMemoryRequirement == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void onFailure(Exception e) {
protected void doRun() {
Long next = null;
try {
next = holder.executeLoopBack(startTime, endTime);
next = holder.executeLookBack(startTime, endTime);
} catch (DatafeedJob.ExtractionProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
Expand Down Expand Up @@ -253,7 +253,7 @@ private String getJobId(TransportStartDatafeedAction.DatafeedTask task) {
}

private JobState getJobState(PersistentTasksCustomMetaData tasks, TransportStartDatafeedAction.DatafeedTask datafeedTask) {
return MlTasks.getJobState(getJobId(datafeedTask), tasks);
return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks);
}

private TimeValue computeNextDelay(long next) {
Expand All @@ -272,7 +272,7 @@ public class Holder {
private final TransportStartDatafeedAction.DatafeedTask task;
private final long allocationId;
private final String datafeedId;
// To ensure that we wait until loopback / realtime search has completed before we stop the datafeed
// To ensure that we wait until lookback / realtime search has completed before we stop the datafeed
private final ReentrantLock datafeedJobLock = new ReentrantLock(true);
private final DatafeedJob datafeedJob;
private final boolean autoCloseJob;
Expand Down Expand Up @@ -352,7 +352,7 @@ public void setRelocating() {
isRelocating = true;
}

private Long executeLoopBack(long startTime, Long endTime) throws Exception {
private Long executeLookBack(long startTime, Long endTime) throws Exception {
datafeedJobLock.lock();
try {
if (isRunning() && !isIsolated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,16 @@ public void testJobTaskMatcherMatch() {
}

public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
addJobTask(jobId, nodeId, jobState, builder, false);
}

public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder,
boolean isStale) {
builder.addTask(MlTasks.jobTaskId(jobId), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(jobId),
new Assignment(nodeId, "test assignment"));
new Assignment(nodeId, "test assignment"));
if (jobState != null) {
builder.updateTaskState(MlTasks.jobTaskId(jobId), new JobTaskState(jobState, builder.getLastAllocationId()));
builder.updateTaskState(MlTasks.jobTaskId(jobId),
new JobTaskState(jobState, builder.getLastAllocationId() - (isStale ? 1 : 0)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void testRealTime_GivenNonStoppingAnalysisProblem() throws Exception {
assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true));
}

public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception {
public void testStart_GivenNewlyCreatedJobLookBackAndRealtime() throws Exception {
when(datafeedJob.runLookBack(anyLong(), anyLong())).thenReturn(1L);
when(datafeedJob.runRealtime()).thenReturn(1L);

Expand Down Expand Up @@ -282,8 +282,45 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() {
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
}

public void testDatafeedTaskWaitsUntilJobIsNotStale() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder, true);
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
when(clusterService.state()).thenReturn(cs.build());

Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedManager.run(task, handler);

// Verify datafeed has not started running yet as job is stale (i.e. even though opened it is part way through relocating)
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);

tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder, true);
addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));

capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build()));

// Still no run
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);

tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));

capturedClusterStateListener.getValue().clusterChanged(
new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build()));

// Now it should run as the job state chanded to OPENED
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
}

public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
Expand All @@ -296,7 +333,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() {
// Verify datafeed has not started running yet as job is still opening
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);

tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.FAILED, tasksBuilder);
ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
Expand All @@ -309,7 +346,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() {
}

public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
Expand All @@ -326,7 +363,7 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() {
datafeedManager.stopDatafeed(task, "test", StopDatafeedAction.DEFAULT_TIMEOUT);

// Update job state to opened
tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
Expand Down