diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 63b8f90a114d0..4f9fb89509cb7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; @@ -17,6 +18,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; @@ -291,7 +293,12 @@ protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAct threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - listener.onFailure(e); + if (e instanceof ResourceNotFoundException && Strings.isAllOrWildcard(new String[]{request.getJobId()})) { + jobTask.closeJob("close job (api)"); + listener.onResponse(new CloseJobAction.Response(true)); + } else { + listener.onFailure(e); + } } @Override @@ -356,7 +363,10 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { @Override public void onFailure(Exception e) { final int slot = counter.incrementAndGet(); - failures.set(slot - 1, e); + if ((e instanceof ResourceNotFoundException && + Strings.isAllOrWildcard(new String[]{request.getJobId()})) == false) { + failures.set(slot - 1, e); + } if (slot == numberOfJobs) { sendResponseOrFailure(request.getJobId(), listener, failures); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 77910f21f67d1..5d9d900aaa8dd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; @@ -193,7 +194,10 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask persisten @Override public void onFailure(Exception e) { final int slot = counter.incrementAndGet(); - failures.set(slot - 1, e); + if ((e instanceof ResourceNotFoundException && + Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) { + failures.set(slot - 1, e); + } if (slot == startedDatafeeds.size()) { sendResponseOrFailure(request.getDatafeedId(), listener, failures); } @@ -221,7 +225,13 @@ protected void taskOperation(StopDatafeedAction.Request request, TransportStartD threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - listener.onFailure(e); + if ((e instanceof ResourceNotFoundException && + Strings.isAllOrWildcard(new String[]{request.getDatafeedId()}))) { + datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout()); + listener.onResponse(new StopDatafeedAction.Response(true)); + } else { + listener.onFailure(e); + } } @Override