From e0ce73713f7292954ff66691919abf62d71442cd Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 9 Jan 2019 10:42:47 +0000 Subject: [PATCH] [ML] Stop datafeeds running when their jobs are stale (#37227) We already had logic to stop datafeeds running against jobs that were OPENING, but a job that relocates from one node to another while OPENED stays OPENED, and this could cause the datafeed to fail when it sent data to the OPENED job on its new node before it had a corresponding autodetect process. This change extends the check to stop datafeeds running when their job is OPENING _or_ stale (i.e. has not had its status reset since relocating to a different node). Relates #36810 --- .../elasticsearch/xpack/core/ml/MlTasks.java | 35 ++++++++++++++ .../core/ml/job/config/JobTaskState.java | 11 +++++ .../ml/action/TransportOpenJobAction.java | 26 ++-------- .../xpack/ml/datafeed/DatafeedManager.java | 8 ++-- .../action/TransportOpenJobActionTests.java | 10 +++- .../ml/datafeed/DatafeedManagerTests.java | 47 +++++++++++++++++-- 6 files changed, 104 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index e78649d152296..b81a1f7d7b9c0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -55,6 +55,11 @@ public static PersistentTasksCustomMetaData.PersistentTask getDatafeedTask(St return tasks == null ? null : tasks.getTask(datafeedTaskId(datafeedId)); } + /** + * Note that the return value of this method does NOT take node relocations into account. + * Use {@link #getJobStateModifiedForReassignments} to return a value adjusted to the most + * appropriate value following relocations. + */ public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTasksCustomMetaData.PersistentTask task = getJobTask(jobId, tasks); if (task != null) { @@ -68,6 +73,36 @@ public static JobState getJobState(String jobId, @Nullable PersistentTasksCustom return JobState.CLOSED; } + public static JobState getJobStateModifiedForReassignments(String jobId, @Nullable PersistentTasksCustomMetaData tasks) { + return getJobStateModifiedForReassignments(getJobTask(jobId, tasks)); + } + + public static JobState getJobStateModifiedForReassignments(@Nullable PersistentTasksCustomMetaData.PersistentTask task) { + if (task == null) { + // A closed job has no persistent task + return JobState.CLOSED; + } + JobTaskState jobTaskState = (JobTaskState) task.getState(); + if (jobTaskState == null) { + return JobState.OPENING; + } + JobState jobState = jobTaskState.getState(); + if (jobTaskState.isStatusStale(task)) { + // the job is re-locating + if (jobState == JobState.CLOSING) { + // previous executor node failed while the job was closing - it won't + // be reopened on another node, so consider it CLOSED for most purposes + return JobState.CLOSED; + } + if (jobState != JobState.FAILED) { + // previous executor node failed and current executor node didn't + // have the chance to set job status to OPENING + return JobState.OPENING; + } + } + return jobState; + } + public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTasksCustomMetaData.PersistentTask task = getDatafeedTask(datafeedId, tasks); if (task != null && task.getState() != null) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java index 2e6cc4b99c4bb..d979b897ad43a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java @@ -67,6 +67,17 @@ public JobState getState() { return state; } + /** + * The job state stores the allocation ID at the time it was last set. + * This method compares the allocation ID in the state with the allocation + * ID in the task. If the two are different then the task has been relocated + * to a different node after the last time the state was set. This in turn + * means that the state is not necessarily correct. For example, a job that + * has a state of OPENED but is stale must be considered to be OPENING, because + * it won't yet have a corresponding autodetect process. + * @param task The job task to check. + * @return Has the task been relocated to another node and not had its status set since then? + */ public boolean isStatusStale(PersistentTask task) { return allocationId != task.getAllocationId(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index fad24247834d5..c81a539fb0ea4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -225,31 +225,13 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j Collection> assignedTasks = persistentTasks.findTasks( MlTasks.JOB_TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); for (PersistentTasksCustomMetaData.PersistentTask assignedTask : assignedTasks) { - JobTaskState jobTaskState = (JobTaskState) assignedTask.getState(); - JobState jobState; - if (jobTaskState == null) { - // executor node didn't have the chance to set job status to OPENING - ++numberOfAllocatingJobs; - jobState = JobState.OPENING; - } else { - jobState = jobTaskState.getState(); - if (jobTaskState.isStatusStale(assignedTask)) { - // the job is re-locating - if (jobState == JobState.CLOSING) { - // previous executor node failed while the job was closing - it won't - // be reopened, so consider it CLOSED for resource usage purposes - jobState = JobState.CLOSED; - } else if (jobState != JobState.FAILED) { - // previous executor node failed and current executor node didn't - // have the chance to set job status to OPENING - ++numberOfAllocatingJobs; - jobState = JobState.OPENING; - } - } - } + JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask); if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) { // Don't count CLOSED or FAILED jobs, as they don't consume native memory ++numberOfAssignedJobs; + if (jobState == JobState.OPENING) { + ++numberOfAllocatingJobs; + } OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams(); Long jobMemoryRequirement = memoryTracker.getJobMemoryRequirement(params.getJobId()); if (jobMemoryRequirement == null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 4a0f3da060d02..6367a13100ed0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -161,7 +161,7 @@ public void onFailure(Exception e) { protected void doRun() { Long next = null; try { - next = holder.executeLoopBack(startTime, endTime); + next = holder.executeLookBack(startTime, endTime); } catch (DatafeedJob.ExtractionProblemException e) { if (endTime == null) { next = e.nextDelayInMsSinceEpoch; @@ -253,7 +253,7 @@ private String getJobId(TransportStartDatafeedAction.DatafeedTask task) { } private JobState getJobState(PersistentTasksCustomMetaData tasks, TransportStartDatafeedAction.DatafeedTask datafeedTask) { - return MlTasks.getJobState(getJobId(datafeedTask), tasks); + return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks); } private TimeValue computeNextDelay(long next) { @@ -272,7 +272,7 @@ public class Holder { private final TransportStartDatafeedAction.DatafeedTask task; private final long allocationId; private final String datafeedId; - // To ensure that we wait until loopback / realtime search has completed before we stop the datafeed + // To ensure that we wait until lookback / realtime search has completed before we stop the datafeed private final ReentrantLock datafeedJobLock = new ReentrantLock(true); private final DatafeedJob datafeedJob; private final boolean autoCloseJob; @@ -352,7 +352,7 @@ public void setRelocating() { isRelocating = true; } - private Long executeLoopBack(long startTime, Long endTime) throws Exception { + private Long executeLookBack(long startTime, Long endTime) throws Exception { datafeedJobLock.lock(); try { if (isRunning() && !isIsolated()) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index cfb16254a9dde..04dfa5f27502d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -575,10 +575,16 @@ public void testJobTaskMatcherMatch() { } public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) { + addJobTask(jobId, nodeId, jobState, builder, false); + } + + public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder, + boolean isStale) { builder.addTask(MlTasks.jobTaskId(jobId), MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(jobId), - new Assignment(nodeId, "test assignment")); + new Assignment(nodeId, "test assignment")); if (jobState != null) { - builder.updateTaskState(MlTasks.jobTaskId(jobId), new JobTaskState(jobState, builder.getLastAllocationId())); + builder.updateTaskState(MlTasks.jobTaskId(jobId), + new JobTaskState(jobState, builder.getLastAllocationId() - (isStale ? 1 : 0))); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index edf734544091c..9bf883232c623 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -222,7 +222,7 @@ public void testRealTime_GivenNonStoppingAnalysisProblem() throws Exception { assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true)); } - public void testStart_GivenNewlyCreatedJobLoopBackAndRealtime() throws Exception { + public void testStart_GivenNewlyCreatedJobLookBackAndRealtime() throws Exception { when(datafeedJob.runLookBack(anyLong(), anyLong())).thenReturn(1L); when(datafeedJob.runRealtime()).thenReturn(1L); @@ -282,8 +282,45 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); } + public void testDatafeedTaskWaitsUntilJobIsNotStale() { + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder, true); + ClusterState.Builder cs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + when(clusterService.state()).thenReturn(cs.build()); + + Consumer handler = mockConsumer(); + DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); + datafeedManager.run(task, handler); + + // Verify datafeed has not started running yet as job is stale (i.e. even though opened it is part way through relocating) + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder, true); + addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + + capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build())); + + // Still no run + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + + capturedClusterStateListener.getValue().clusterChanged( + new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build())); + + // Now it should run as the job state chanded to OPENED + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + } + public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); @@ -296,7 +333,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { // Verify datafeed has not started running yet as job is still opening verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); - tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.FAILED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); @@ -309,7 +346,7 @@ public void testDatafeedTaskStopsBecauseJobFailedWhileOpening() { } public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); ClusterState.Builder cs = ClusterState.builder(clusterService.state()) .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); @@ -326,7 +363,7 @@ public void testDatafeedGetsStoppedWhileWaitingForJobToOpen() { datafeedManager.stopDatafeed(task, "test", StopDatafeedAction.DEFAULT_TIMEOUT); // Update job state to opened - tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder updatedCs = ClusterState.builder(clusterService.state()) .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));