From 86b01f5d0fe153311743127eecafff3c02ecde69 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 11 Jan 2019 13:22:35 +0000 Subject: [PATCH] [ML] Wait for autodetect to be ready in the datafeed (#37349) This is a reinforcement of #37227. It turns out that persistent tasks are not made stale if the node they were running on is restarted and the master node does not notice this. The main scenario where this happens is when minimum master nodes is the same as the number of nodes in the cluster, so the cluster cannot elect a master node when any node is restarted. When an ML node restarts we need the datafeeds for any jobs that were running on that node to not just wait until the jobs are allocated, but to wait for the autodetect process of the job to start up. In the case of reassignment of the job persistent task this was dealt with by the stale status test. But in the case where a node restarts but its persistent tasks are not reassigned we need a deeper test. Fixes #36810 --- .../xpack/ml/MachineLearning.java | 2 +- .../xpack/ml/datafeed/DatafeedManager.java | 29 ++++++++-- .../autodetect/AutodetectProcessManager.java | 27 +++++++-- .../ml/datafeed/DatafeedManagerTests.java | 55 +++++++++++++++++-- .../60_ml_config_migration.yml | 3 - 5 files changed, 98 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index bdbf73bc1200f..438911f84afed 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -429,7 +429,7 @@ public Collection createComponents(Client client, ClusterService cluster DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry, auditor, System::currentTimeMillis); DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, - System::currentTimeMillis, auditor); + System::currentTimeMillis, auditor, autodetectProcessManager); this.datafeedManager.set(datafeedManager); MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager, autodetectProcessManager); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 7944657aaddc0..ed5c3bbc3bcc3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -27,9 +27,11 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.ArrayList; @@ -62,16 +64,18 @@ public class DatafeedManager { private final ConcurrentMap runningDatafeedsOnThisNode = new ConcurrentHashMap<>(); private final DatafeedJobBuilder datafeedJobBuilder; private final TaskRunner taskRunner = new TaskRunner(); + private final AutodetectProcessManager autodetectProcessManager; private volatile boolean isolated; public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder, - Supplier currentTimeSupplier, Auditor auditor) { + Supplier currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) { this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); this.threadPool = threadPool; this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier); this.auditor = Objects.requireNonNull(auditor); this.datafeedJobBuilder = Objects.requireNonNull(datafeedJobBuilder); + this.autodetectProcessManager = autodetectProcessManager; clusterService.addListener(taskRunner); } @@ -256,6 +260,21 @@ private JobState getJobState(PersistentTasksCustomMetaData tasks, TransportStart return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks); } + private boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData tasks, + TransportStartDatafeedAction.DatafeedTask datafeedTask) { + PersistentTasksCustomMetaData.PersistentTask jobTask = MlTasks.getJobTask(getJobId(datafeedTask), tasks); + if (jobTask == null) { + return false; + } + + JobTaskState state = (JobTaskState) jobTask.getState(); + if (state == null || state.isStatusStale(jobTask)) { + return false; + } + + return autodetectProcessManager.hasOpenAutodetectCommunicator(jobTask.getAllocationId()); + } + private TimeValue computeNextDelay(long next) { return new TimeValue(Math.max(1, next - currentTimeSupplier.get())); } @@ -446,7 +465,7 @@ private class TaskRunner implements ClusterStateListener { private void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask) { ClusterState clusterState = clusterService.state(); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (getJobState(tasks, datafeedTask) == JobState.OPENED) { + if (getJobState(tasks, datafeedTask) == JobState.OPENED && jobHasOpenAutodetectCommunicator(tasks, datafeedTask)) { runTask(datafeedTask); } else { logger.info("Datafeed [{}] is waiting for job [{}] to be opened", @@ -485,10 +504,10 @@ public void clusterChanged(ClusterChangedEvent event) { continue; } JobState jobState = getJobState(currentTasks, datafeedTask); - if (jobState == JobState.OPENED) { - runTask(datafeedTask); - } else if (jobState == JobState.OPENING) { + if (jobState == JobState.OPENING || jobHasOpenAutodetectCommunicator(currentTasks, datafeedTask) == false) { remainingTasks.add(datafeedTask); + } else if (jobState == JobState.OPENED) { + runTask(datafeedTask); } else { logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]", datafeedTask.getDatafeedId(), getJobId(datafeedTask), jobState); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 88b04568732d6..470efeebdf32a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -215,6 +215,13 @@ public void killAllProcessesOnThisNode() { */ public void persistJob(JobTask jobTask, Consumer handler) { AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); + if (communicator == null) { + String message = String.format(Locale.ROOT, "Cannot persist because job [%s] does not have a corresponding autodetect process", + jobTask.getJobId()); + logger.debug(message); + handler.accept(ExceptionsHelper.conflictStatusException(message)); + return; + } communicator.persistJob((aVoid, e) -> handler.accept(e)); } @@ -242,7 +249,8 @@ public void processData(JobTask jobTask, AnalysisRegistry analysisRegistry, Inpu XContentType xContentType, DataLoadParams params, BiConsumer handler) { AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { - throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open"); + throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + + "] does not have a corresponding autodetect process"); } communicator.writeToJob(input, analysisRegistry, xContentType, params, handler); } @@ -260,7 +268,8 @@ public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener handler) { AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { - String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open"; + String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + + "] does not have a corresponding autodetect process"; logger.debug(message); handler.accept(ExceptionsHelper.conflictStatusException(message)); return; @@ -667,6 +678,14 @@ private AutodetectCommunicator getOpenAutodetectCommunicator(JobTask jobTask) { return null; } + public boolean hasOpenAutodetectCommunicator(long jobAllocationId) { + ProcessContext processContext = processByAllocation.get(jobAllocationId); + if (processContext != null && processContext.getState() == ProcessContext.ProcessStateName.RUNNING) { + return processContext.getAutodetectCommunicator() != null; + } + return false; + } + public Optional jobOpenTime(JobTask jobTask) { AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask); if (communicator == null) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index 4b0d34fec79c8..eaa69de2c23b5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction.DatafeedTask; import org.elasticsearch.xpack.ml.action.TransportStartDatafeedActionTests; import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.notifications.Auditor; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -48,6 +49,7 @@ import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask; @@ -74,13 +76,14 @@ public class DatafeedManagerTests extends ESTestCase { private long currentTime = 120000; private Auditor auditor; private ArgumentCaptor capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class); + private AtomicBoolean hasOpenAutodetectCommunicator; @Before @SuppressWarnings("unchecked") public void setUpTests() { Job.Builder job = createDatafeedJob().setCreateTime(new Date()); - PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder); PersistentTasksCustomMetaData tasks = tasksBuilder.build(); DiscoveryNodes nodes = DiscoveryNodes.builder() @@ -128,7 +131,12 @@ public void setUpTests() { return null; }).when(datafeedJobBuilder).build(any(), any(), any()); - datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor); + hasOpenAutodetectCommunicator = new AtomicBoolean(true); + AutodetectProcessManager autodetectProcessManager = mock(AutodetectProcessManager.class); + doAnswer(invocation -> hasOpenAutodetectCommunicator.get()).when(autodetectProcessManager).hasOpenAutodetectCommunicator(anyLong()); + + datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor, + autodetectProcessManager); verify(clusterService).addListener(capturedClusterStateListener.capture()); } @@ -259,7 +267,7 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { // Verify datafeed has not started running yet as job is still opening verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); - tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder); addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) @@ -270,7 +278,7 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { // Still no run verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); - tasksBuilder = PersistentTasksCustomMetaData.builder(); + tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state()) .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); @@ -278,7 +286,44 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() { capturedClusterStateListener.getValue().clusterChanged( new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build())); - // Now it should run as the job state chanded to OPENED + // Now it should run as the job state changed to OPENED + verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + } + + public void testDatafeedTaskWaitsUntilAutodetectCommunicatorIsOpen() { + + hasOpenAutodetectCommunicator.set(false); + + PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder cs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + when(clusterService.state()).thenReturn(cs.build()); + + Consumer handler = mockConsumer(); + DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L); + datafeedManager.run(task, handler); + + // Verify datafeed has not started running yet as job doesn't have an open autodetect communicator + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + tasksBuilder = PersistentTasksCustomMetaData.builder(); + addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder); + addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder); + ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state()) + .metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())); + + capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build())); + + // Still no run + verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); + + hasOpenAutodetectCommunicator.set(true); + + capturedClusterStateListener.getValue().clusterChanged( + new ClusterChangedEvent("_source", cs.build(), anotherJobCs.build())); + + // Now it should run as the autodetect communicator is open verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); } diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml index d8616b4e8277f..be36d7358e794 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/60_ml_config_migration.yml @@ -8,9 +8,6 @@ setup: --- "Test old cluster jobs and datafeeds and delete them": - - skip: - version: "all" - reason: "@AwaitsFix: https://github.com/elastic/elasticsearch/issues/36810" - do: xpack.ml.get_jobs: