diff --git a/docs/reference/settings/data-frames-settings.asciidoc b/docs/reference/settings/transform-settings.asciidoc similarity index 63% rename from docs/reference/settings/data-frames-settings.asciidoc rename to docs/reference/settings/transform-settings.asciidoc index a4568ae3b88df..6f60205951ea1 100644 --- a/docs/reference/settings/data-frames-settings.asciidoc +++ b/docs/reference/settings/transform-settings.asciidoc @@ -1,6 +1,6 @@ [role="xpack"] -[[data-frames-settings]] +[[transform-settings]] === {transforms-cap} settings in Elasticsearch [subs="attributes"] ++++ @@ -9,17 +9,30 @@ You do not need to configure any settings to use {transforms}. It is enabled by default. -All of these settings can be added to the `elasticsearch.yml` configuration file. -The dynamic settings can also be updated across a cluster with the +All of these settings can be added to the `elasticsearch.yml` configuration file. +The dynamic settings can also be updated across a cluster with the <>. -TIP: Dynamic settings take precedence over settings in the `elasticsearch.yml` +TIP: Dynamic settings take precedence over settings in the `elasticsearch.yml` file. [float] -[[general-data-frames-settings]] +[[general-transform-settings]] ==== General {transforms} settings +`node.transform`:: +Set to `true` to identify the node as a _transform node_. The default is `false` if +either `node.data` or `xpack.transform.enabled` is `false` for the node, and `true` otherwise. + ++ +If set to `false` in `elasticsearch.yml`, the node cannot run transforms. If set to +`true` but `xpack.transform.enabled` is set to `false`, the `node.transform` setting is +ignored and the node cannot run transforms. If you want to run transforms, there must be at +least one transform node in your cluster. + ++ +IMPORTANT: It is advised to use the `node.transform` setting to constrain the execution +of transforms to certain nodes instead of using `xpack.transform.enabled`. On dedicated +coordinating nodes or dedicated master nodes, disable the node.transform role. + `xpack.transform.enabled`:: Set to `true` (default) to enable {transforms} on the node. + + diff --git a/docs/reference/setup.asciidoc b/docs/reference/setup.asciidoc index d0d26afe5feff..57b46e1e40a0a 100644 --- a/docs/reference/setup.asciidoc +++ b/docs/reference/setup.asciidoc @@ -51,7 +51,7 @@ include::settings/audit-settings.asciidoc[] include::settings/ccr-settings.asciidoc[] -include::settings/data-frames-settings.asciidoc[] +include::settings/transform-settings.asciidoc[] include::settings/ilm-settings.asciidoc[] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfig.java index b59a7912cf701..b3478e89520c0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfig.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper; import java.io.IOException; @@ -98,6 +99,10 @@ public boolean isValid() { return queryConfig.isValid(); } + public boolean requiresRemoteCluster() { + return Arrays.stream(index).anyMatch(RemoteClusterLicenseChecker::isRemoteIndex); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(index); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java index ea5d97e049e08..6b7965b74df44 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformTaskParams.java @@ -25,29 +25,35 @@ public class TransformTaskParams extends AbstractDiffable i public static final String NAME = TransformField.TASK_NAME; public static final ParseField FREQUENCY = TransformField.FREQUENCY; + public static final ParseField REQUIRES_REMOTE = new ParseField("requires_remote"); private final String transformId; private final Version version; private final TimeValue frequency; + private final Boolean requiresRemote; public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, - a -> new TransformTaskParams((String) a[0], (String) a[1], (String) a[2])); + a -> new TransformTaskParams((String) a[0], (String) a[1], (String) a[2], (Boolean) a[3])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), TransformField.ID); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TransformField.VERSION); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REQUIRES_REMOTE); } - private TransformTaskParams(String transformId, String version, String frequency) { + private TransformTaskParams(String transformId, String version, String frequency, Boolean remote) { this(transformId, version == null ? null : Version.fromString(version), - frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName())); + frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()), + remote == null ? false : remote.booleanValue() + ); } - public TransformTaskParams(String transformId, Version version, TimeValue frequency) { + public TransformTaskParams(String transformId, Version version, TimeValue frequency, boolean remote) { this.transformId = transformId; this.version = version == null ? Version.V_7_2_0 : version; this.frequency = frequency; + this.requiresRemote = remote; } public TransformTaskParams(StreamInput in) throws IOException { @@ -62,6 +68,11 @@ public TransformTaskParams(StreamInput in) throws IOException { } else { this.frequency = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_7_0 + this.requiresRemote = in.readBoolean(); + } else { + this.requiresRemote = false; + } } @Override @@ -83,6 +94,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_3_0)) { out.writeOptionalTimeValue(frequency); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_7_0 + out.writeBoolean(requiresRemote); + } } @Override @@ -93,6 +107,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (frequency != null) { builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep()); } + builder.field(REQUIRES_REMOTE.getPreferredName(), requiresRemote); builder.endObject(); return builder; } @@ -109,6 +124,10 @@ public TimeValue getFrequency() { return frequency; } + public boolean requiresRemote() { + return requiresRemote; + } + public static TransformTaskParams fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } @@ -127,11 +146,12 @@ public boolean equals(Object other) { return Objects.equals(this.transformId, that.transformId) && Objects.equals(this.version, that.version) - && Objects.equals(this.frequency, that.frequency); + && Objects.equals(this.frequency, that.frequency) + && this.requiresRemote == that.requiresRemote; } @Override public int hashCode() { - return Objects.hash(transformId, version, frequency); + return Objects.hash(transformId, version, frequency, requiresRemote); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfigTests.java index 972bb9cc83138..629a9485a190f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SourceConfigTests.java @@ -59,4 +59,21 @@ protected Reader instanceReader() { return SourceConfig::new; } + public void testRequiresRemoteCluster() { + assertFalse(new SourceConfig(new String [] {"index1", "index2", "index3"}, + QueryConfigTests.randomQueryConfig()).requiresRemoteCluster()); + + assertTrue(new SourceConfig(new String [] {"index1", "remote2:index2", "index3"}, + QueryConfigTests.randomQueryConfig()).requiresRemoteCluster()); + + assertTrue(new SourceConfig(new String [] {"index1", "index2", "remote3:index3"}, + QueryConfigTests.randomQueryConfig()).requiresRemoteCluster()); + + assertTrue(new SourceConfig(new String [] {"index1", "remote2:index2", "remote3:index3"}, + QueryConfigTests.randomQueryConfig()).requiresRemoteCluster()); + + assertTrue(new SourceConfig(new String [] {"remote1:index1"}, + QueryConfigTests.randomQueryConfig()).requiresRemoteCluster()); + } + } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTests.java index 3f0ecdc04f16d..a5fb99c70bd00 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformTests.java @@ -27,7 +27,7 @@ protected TransformTaskParams doParseInstance(XContentParser parser) throws IOEx @Override protected TransformTaskParams createTestInstance() { return new TransformTaskParams(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT, - randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000))); + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), randomBoolean()); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index e787a253a5f2d..912ac73c1254d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -14,12 +14,14 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsModule; @@ -39,6 +41,7 @@ import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; @@ -137,6 +140,23 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa Setting.Property.Dynamic ); + /** + * Node attributes for transform, automatically created and retrievable via cluster state. + * These attributes should never be set directly, use the node setting counter parts instead. + */ + public static final String TRANSFORM_ENABLED_NODE_ATTR = "transform.node"; + public static final String TRANSFORM_REMOTE_ENABLED_NODE_ATTR = "transform.remote_connect"; + + /** + * Setting whether transform (the coordinator task) can run on this node and REST API's are available, + * respects xpack.transform.enabled (for the whole plugin) as fallback + */ + public static final Setting TRANSFORM_ENABLED_NODE = Setting.boolSetting( + "node.transform", + settings -> Boolean.toString(XPackSettings.TRANSFORM_ENABLED.get(settings) && DiscoveryNode.isDataNode(settings)), + Property.NodeScope + ); + public Transform(Settings settings) { this.settings = settings; this.enabled = XPackSettings.TRANSFORM_ENABLED.get(settings); @@ -222,8 +242,14 @@ public List> getExecutorBuilders(Settings settings) { return emptyList(); } - FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, TASK_THREAD_POOL_NAME, 4, 4, "transform.task_thread_pool", - false); + FixedExecutorBuilder indexing = new FixedExecutorBuilder( + settings, + TASK_THREAD_POOL_NAME, + 4, + 4, + "transform.task_thread_pool", + false + ); return Collections.singletonList(indexing); } @@ -296,13 +322,44 @@ public List> getPersistentTasksExecutor( // the transform services should have been created assert transformServices.get() != null; - return Collections.singletonList(new TransformPersistentTasksExecutor(client, transformServices.get(), threadPool, clusterService, - settingsModule.getSettings(), expressionResolver)); + return Collections.singletonList( + new TransformPersistentTasksExecutor( + client, + transformServices.get(), + threadPool, + clusterService, + settingsModule.getSettings(), + expressionResolver + ) + ); } @Override public List> getSettings() { - return Collections.singletonList(NUM_FAILURE_RETRIES_SETTING); + return Collections.unmodifiableList(Arrays.asList(TRANSFORM_ENABLED_NODE, NUM_FAILURE_RETRIES_SETTING)); + } + + @Override + public Settings additionalSettings() { + String transformEnabledNodeAttribute = "node.attr." + TRANSFORM_ENABLED_NODE_ATTR; + String transformRemoteEnabledNodeAttribute = "node.attr." + TRANSFORM_REMOTE_ENABLED_NODE_ATTR; + + if (settings.get(transformEnabledNodeAttribute) != null || settings.get(transformRemoteEnabledNodeAttribute) != null) { + throw new IllegalArgumentException( + "Directly setting transform node attributes is not permitted, please use the documented node settings instead" + ); + } + + if (enabled == false) { + return Settings.EMPTY; + } + + Settings.Builder additionalSettings = Settings.builder(); + + additionalSettings.put(transformEnabledNodeAttribute, TRANSFORM_ENABLED_NODE.get(settings)); + additionalSettings.put(transformRemoteEnabledNodeAttribute, RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings)); + + return additionalSettings.build(); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformUsageTransportAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformUsageTransportAction.java index 219cb9e06b326..19e26053b27b5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformUsageTransportAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformUsageTransportAction.java @@ -54,19 +54,36 @@ public class TransformUsageTransportAction extends XPackUsageFeatureTransportAct private final Client client; @Inject - public TransformUsageTransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Settings settings, XPackLicenseState licenseState, Client client) { - super(XPackUsageFeatureAction.TRANSFORM.name(), transportService, clusterService, - threadPool, actionFilters, indexNameExpressionResolver); + public TransformUsageTransportAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + Settings settings, + XPackLicenseState licenseState, + Client client + ) { + super( + XPackUsageFeatureAction.TRANSFORM.name(), + transportService, + clusterService, + threadPool, + actionFilters, + indexNameExpressionResolver + ); this.enabled = XPackSettings.TRANSFORM_ENABLED.get(settings); this.licenseState = licenseState; this.client = client; } @Override - protected void masterOperation(Task task, XPackUsageRequest request, ClusterState state, - ActionListener listener) { + protected void masterOperation( + Task task, + XPackUsageRequest request, + ClusterState state, + ActionListener listener + ) { boolean available = licenseState.isTransformAllowed(); if (enabled == false) { var usage = new TransformFeatureSetUsage(available, enabled, Collections.emptyMap(), new TransformIndexerStats()); @@ -75,61 +92,66 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat } PersistentTasksCustomMetaData taskMetadata = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(state); - Collection> transformTasks = taskMetadata == null ? - Collections.emptyList() : - taskMetadata.findTasks(TransformTaskParams.NAME, (t) -> true); + Collection> transformTasks = taskMetadata == null + ? Collections.emptyList() + : taskMetadata.findTasks(TransformTaskParams.NAME, (t) -> true); final int taskCount = transformTasks.size(); final Map transformsCountByState = new HashMap<>(); - for(PersistentTasksCustomMetaData.PersistentTask transformTask : transformTasks) { - TransformState transformState = (TransformState)transformTask.getState(); - transformsCountByState.merge(transformState.getTaskState().value(), 1L, Long::sum); + for (PersistentTasksCustomMetaData.PersistentTask transformTask : transformTasks) { + TransformState transformState = (TransformState) transformTask.getState(); + TransformTaskState taskState = transformState.getTaskState(); + if (taskState != null) { + transformsCountByState.merge(taskState.value(), 1L, Long::sum); + } } - ActionListener totalStatsListener = ActionListener.wrap( - statSummations -> { - var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState, statSummations); - listener.onResponse(new XPackUsageFeatureResponse(usage)); - }, - listener::onFailure - ); + ActionListener totalStatsListener = ActionListener.wrap(statSummations -> { + var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState, statSummations); + listener.onResponse(new XPackUsageFeatureResponse(usage)); + }, listener::onFailure); - ActionListener totalTransformCountListener = ActionListener.wrap( - transformCountSuccess -> { - if (transformCountSuccess.getShardFailures().length > 0) { - logger.error("total transform count search returned shard failures: {}", - Arrays.toString(transformCountSuccess.getShardFailures())); - } - long totalTransforms = transformCountSuccess.getHits().getTotalHits().value; - if (totalTransforms == 0) { - var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState, - new TransformIndexerStats()); - listener.onResponse(new XPackUsageFeatureResponse(usage)); - return; - } - transformsCountByState.merge(TransformTaskState.STOPPED.value(), totalTransforms - taskCount, Long::sum); + ActionListener totalTransformCountListener = ActionListener.wrap(transformCountSuccess -> { + if (transformCountSuccess.getShardFailures().length > 0) { + logger.error( + "total transform count search returned shard failures: {}", + Arrays.toString(transformCountSuccess.getShardFailures()) + ); + } + long totalTransforms = transformCountSuccess.getHits().getTotalHits().value; + if (totalTransforms == 0) { + var usage = new TransformFeatureSetUsage(available, enabled, transformsCountByState, new TransformIndexerStats()); + listener.onResponse(new XPackUsageFeatureResponse(usage)); + return; + } + transformsCountByState.merge(TransformTaskState.STOPPED.value(), totalTransforms - taskCount, Long::sum); + TransformInfoTransportAction.getStatisticSummations(client, totalStatsListener); + }, transformCountFailure -> { + if (transformCountFailure instanceof ResourceNotFoundException) { TransformInfoTransportAction.getStatisticSummations(client, totalStatsListener); - }, - transformCountFailure -> { - if (transformCountFailure instanceof ResourceNotFoundException) { - TransformInfoTransportAction.getStatisticSummations(client, totalStatsListener); - } else { - listener.onFailure(transformCountFailure); - } + } else { + listener.onFailure(transformCountFailure); } - ); + }); - SearchRequest totalTransformCount = client - .prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN, - TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED) + SearchRequest totalTransformCount = client.prepareSearch( + TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED + ) .setTrackTotalHits(true) - .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery() - .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformConfig.NAME)))) + .setQuery( + QueryBuilders.constantScoreQuery( + QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformConfig.NAME)) + ) + ) .request(); - ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ClientHelper.executeAsyncWithOrigin( + client.threadPool().getThreadContext(), ClientHelper.TRANSFORM_ORIGIN, totalTransformCount, totalTransformCountListener, - client::search); + client::search + ); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java index ee3e42246650c..be5dd13aee314 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java @@ -265,7 +265,9 @@ protected void masterOperation( ); return; } - transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency())); + transformTaskHolder.set( + createTransform(config.getId(), config.getVersion(), config.getFrequency(), config.getSource().requiresRemoteCluster()) + ); transformConfigHolder.set(config); if (config.getDestination().getPipeline() != null) { if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) { @@ -311,8 +313,13 @@ protected ClusterBlockException checkBlock(StartTransformAction.Request request, return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } - private static TransformTaskParams createTransform(String transformId, Version transformVersion, TimeValue frequency) { - return new TransformTaskParams(transformId, transformVersion, frequency); + private static TransformTaskParams createTransform( + String transformId, + Version transformVersion, + TimeValue frequency, + Boolean requiresRemoteCluster + ) { + return new TransformTaskParams(transformId, transformVersion, frequency, requiresRemoteCluster); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index db752d2d10093..56db82f256553 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -11,6 +11,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.support.IndicesOptions; @@ -49,8 +50,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public class TransformPersistentTasksExecutor extends PersistentTasksExecutor { @@ -100,9 +103,88 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(TransformTaskParam } DiscoveryNode discoveryNode = selectLeastLoadedNode( clusterState, - (node) -> node.isDataNode() && node.getVersion().onOrAfter(params.getVersion()) + (node) -> node.getVersion().onOrAfter(Version.V_8_0_0) + ? nodeCanRunThisTransform(node, params, null) + : nodeCanRunThisTransformPre77(node, params, null) ); - return discoveryNode == null ? NO_NODE_FOUND : new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), ""); + + if (discoveryNode == null) { + Map explainWhyAssignmentFailed = new TreeMap<>(); + for (DiscoveryNode node : clusterState.getNodes()) { + if (node.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_7_0, remove from 8.0 + nodeCanRunThisTransform(node, params, explainWhyAssignmentFailed); + } else { + nodeCanRunThisTransformPre77(node, params, explainWhyAssignmentFailed); + } + } + String reason = "Not starting transform [" + + params.getId() + + "], reasons [" + + explainWhyAssignmentFailed.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining("|")) + + "]"; + + logger.debug(reason); + return new PersistentTasksCustomMetaData.Assignment(null, reason); + } + + return new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), ""); + } + + // todo: this can be removed for 8.0 after backport + public static boolean nodeCanRunThisTransformPre77(DiscoveryNode node, TransformTaskParams params, Map explain) { + if (node.isDataNode() == false) { + if (explain != null) { + explain.put(node.getId(), "not a data node"); + } + return false; + } + + // version of the transform run on a node that has at least the same version + if (node.getVersion().onOrAfter(params.getVersion()) == false) { + if (explain != null) { + explain.put( + node.getId(), + "node has version: " + node.getVersion() + " but transform requires at least " + params.getVersion() + ); + } + return false; + } + + return true; + } + + public static boolean nodeCanRunThisTransform(DiscoveryNode node, TransformTaskParams params, Map explain) { + // version of the transform run on a node that has at least the same version + if (node.getVersion().onOrAfter(params.getVersion()) == false) { + if (explain != null) { + explain.put( + node.getId(), + "node has version: " + node.getVersion() + " but transform requires at least " + params.getVersion() + ); + } + return false; + } + + final Map nodeAttributes = node.getAttributes(); + + // transform enabled? + if (Boolean.parseBoolean(nodeAttributes.get(Transform.TRANSFORM_ENABLED_NODE_ATTR)) == false) { + if (explain != null) { + explain.put(node.getId(), "not a transform node"); + } + return false; + } + + // does the transform require a remote and remote is enabled? + if (params.requiresRemote() && Boolean.parseBoolean(nodeAttributes.get(Transform.TRANSFORM_REMOTE_ENABLED_NODE_ATTR)) == false) { + if (explain != null) { + explain.put(node.getId(), "transform requires a remote connection but remote is disabled"); + } + return false; + } + + // we found no reason that the transform can not run on this node + return true; } static List verifyIndicesPrimaryShardsAreActive(ClusterState clusterState, IndexNameExpressionResolver resolver) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java new file mode 100644 index 0000000000000..9e6b5979baa64 --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformTests.java @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class TransformTests extends ESTestCase { + + public void testNodeAttributes() { + Settings.Builder builder = Settings.builder(); + boolean transformEnabled = true; + boolean remoteEnabled = true; + + if (randomBoolean()) { + transformEnabled = randomBoolean(); + if (randomBoolean()) { + builder.put("node.transform", transformEnabled); + if (randomBoolean()) { + // note: the case where node.transform: true and xpack.transform.enabled: false is benign + builder.put("xpack.transform.enabled", randomBoolean()); + } + } else { + builder.put("xpack.transform.enabled", transformEnabled); + } + } + + if (randomBoolean()) { + remoteEnabled = randomBoolean(); + builder.put("cluster.remote.connect", remoteEnabled); + } + + builder.put("node.attr.some_other_attrib", "value"); + Transform transform = createTransform(builder.build()); + assertNotNull(transform.additionalSettings()); + assertEquals(transformEnabled, Boolean.parseBoolean(transform.additionalSettings().get("node.attr.transform.node"))); + assertEquals( + transformEnabled && remoteEnabled, + Boolean.parseBoolean(transform.additionalSettings().get("node.attr.transform.remote_connect")) + ); + } + + public void testNodeAttributesDirectlyGiven() { + Settings.Builder builder = Settings.builder(); + + if (randomBoolean()) { + builder.put("node.attr.transform.node", randomBoolean()); + } else { + builder.put("node.attr.transform.remote_connect", randomBoolean()); + } + + Transform transform = createTransform(builder.build()); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> transform.additionalSettings()); + assertThat( + e.getMessage(), + equalTo("Directly setting transform node attributes is not permitted, please use the documented node settings instead") + ); + } + + private Transform createTransform(Settings settings) { + XPackLicenseState licenseState = mock(XPackLicenseState.class); + + return new Transform(settings) { + @Override + protected XPackLicenseState getLicenseState() { + return licenseState; + } + }; + } + +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java index 2ce3d8f882a27..61c04bd0088cc 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java @@ -30,38 +30,43 @@ public void testTransformNodes() { String transformIdBar = "df-id-bar"; PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - tasksBuilder.addTask(transformIdFoo, - TransformField.TASK_NAME, new TransformTaskParams(transformIdFoo, Version.CURRENT, null), - new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); - tasksBuilder.addTask(transformIdBar, - TransformField.TASK_NAME, new TransformTaskParams(transformIdBar, Version.CURRENT, null), - new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment")); + tasksBuilder.addTask( + transformIdFoo, + TransformField.TASK_NAME, + new TransformTaskParams(transformIdFoo, Version.CURRENT, null, false), + new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment") + ); + tasksBuilder.addTask( + transformIdBar, + TransformField.TASK_NAME, + new TransformTaskParams(transformIdBar, Version.CURRENT, null, false), + new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment") + ); tasksBuilder.addTask("test-task1", "testTasks", new PersistentTaskParams() { - @Override - public String getWriteableName() { - return "testTasks"; - } + @Override + public String getWriteableName() { + return "testTasks"; + } - @Override - public Version getMinimalSupportedVersion() { - return null; - } + @Override + public Version getMinimalSupportedVersion() { + return null; + } - @Override - public void writeTo(StreamOutput out) { + @Override + public void writeTo(StreamOutput out) { - } + } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) { - return null; - } - }, - new PersistentTasksCustomMetaData.Assignment("node-3", "test assignment")); + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) { + return null; + } + }, new PersistentTasksCustomMetaData.Assignment("node-3", "test assignment")); ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) - .build(); + .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build())) + .build(); String[] nodes = TransformNodes.transformTaskNodes(Arrays.asList(transformIdFoo, transformIdBar), cs); assertEquals(2, nodes.length); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java index 0ca86c3657f62..311f4a9b6f0fb 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportStopTransformActionTests.java @@ -17,8 +17,8 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformMessages; -import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.TransformState; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import java.util.ArrayList; @@ -48,35 +48,35 @@ public void testTaskStateValidationWithNoTasks() { public void testTaskStateValidationWithTransformTasks() { // Test with the task state being null PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder() - .addTask("non-failed-task", + .addTask( + "non-failed-task", TransformTaskParams.NAME, - new TransformTaskParams("transform-task-1", Version.CURRENT, null), - new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")); + new TransformTaskParams("transform-task-1", Version.CURRENT, null, false), + new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "") + ); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); TransportStopTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); // test again with a non failed task but this time it has internal state - pTasksBuilder.updateTaskState("non-failed-task", new TransformState(TransformTaskState.STOPPED, - IndexerState.STOPPED, - null, - 0L, - null, - null)); + pTasksBuilder.updateTaskState( + "non-failed-task", + new TransformState(TransformTaskState.STOPPED, IndexerState.STOPPED, null, 0L, null, null) + ); csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); TransportStopTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); - pTasksBuilder.addTask("failed-task", + pTasksBuilder.addTask( + "failed-task", TransformTaskParams.NAME, - new TransformTaskParams("transform-task-1", Version.CURRENT, null), - new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")) - .updateTaskState("failed-task", new TransformState(TransformTaskState.FAILED, - IndexerState.STOPPED, - null, - 0L, - "task has failed", - null)); + new TransformTaskParams("transform-task-1", Version.CURRENT, null, false), + new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "") + ) + .updateTaskState( + "failed-task", + new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0L, "task has failed", null) + ); csBuilder = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); TransportStopTransformAction.validateTaskState(csBuilder.build(), Arrays.asList("non-failed-task", "failed-task"), true); @@ -84,51 +84,59 @@ public void testTaskStateValidationWithTransformTasks() { TransportStopTransformAction.validateTaskState(csBuilder.build(), Collections.singletonList("non-failed-task"), false); ClusterState.Builder csBuilderFinal = ClusterState.builder(new ClusterName("_name")).metaData(buildMetadata(pTasksBuilder.build())); - ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, - () -> TransportStopTransformAction.validateTaskState(csBuilderFinal.build(), - Collections.singletonList("failed-task"), - false)); + ElasticsearchStatusException ex = expectThrows( + ElasticsearchStatusException.class, + () -> TransportStopTransformAction.validateTaskState(csBuilderFinal.build(), Collections.singletonList("failed-task"), false) + ); assertThat(ex.status(), equalTo(CONFLICT)); - assertThat(ex.getMessage(), - equalTo(TransformMessages.getMessage(TransformMessages.CANNOT_STOP_FAILED_TRANSFORM, - "failed-task", - "task has failed"))); + assertThat( + ex.getMessage(), + equalTo(TransformMessages.getMessage(TransformMessages.CANNOT_STOP_FAILED_TRANSFORM, "failed-task", "task has failed")) + ); } public void testFirstNotOKStatus() { List nodeFailures = new ArrayList<>(); List taskOperationFailures = new ArrayList<>(); - nodeFailures.add(new ElasticsearchException("nodefailure", - new ElasticsearchStatusException("failure", RestStatus.UNPROCESSABLE_ENTITY))); - taskOperationFailures.add(new TaskOperationFailure("node", - 1, - new ElasticsearchStatusException("failure", RestStatus.BAD_REQUEST))); - - assertThat(TransportStopTransformAction.firstNotOKStatus(Collections.emptyList(), Collections.emptyList()), - equalTo(RestStatus.INTERNAL_SERVER_ERROR)); - - assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, Collections.emptyList()), - equalTo(RestStatus.BAD_REQUEST)); - assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, nodeFailures), - equalTo(RestStatus.BAD_REQUEST)); - assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, - Collections.singletonList(new ElasticsearchException(new ElasticsearchStatusException("not failure", RestStatus.OK)))), - equalTo(RestStatus.BAD_REQUEST)); - - assertThat(TransportStopTransformAction.firstNotOKStatus( - Collections.singletonList(new TaskOperationFailure( - "node", - 1, - new ElasticsearchStatusException("not failure", RestStatus.OK))), - nodeFailures), - equalTo(RestStatus.INTERNAL_SERVER_ERROR)); - - assertThat(TransportStopTransformAction.firstNotOKStatus( - Collections.emptyList(), - nodeFailures), - equalTo(RestStatus.INTERNAL_SERVER_ERROR)); + nodeFailures.add( + new ElasticsearchException("nodefailure", new ElasticsearchStatusException("failure", RestStatus.UNPROCESSABLE_ENTITY)) + ); + taskOperationFailures.add(new TaskOperationFailure("node", 1, new ElasticsearchStatusException("failure", RestStatus.BAD_REQUEST))); + + assertThat( + TransportStopTransformAction.firstNotOKStatus(Collections.emptyList(), Collections.emptyList()), + equalTo(RestStatus.INTERNAL_SERVER_ERROR) + ); + + assertThat( + TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, Collections.emptyList()), + equalTo(RestStatus.BAD_REQUEST) + ); + assertThat(TransportStopTransformAction.firstNotOKStatus(taskOperationFailures, nodeFailures), equalTo(RestStatus.BAD_REQUEST)); + assertThat( + TransportStopTransformAction.firstNotOKStatus( + taskOperationFailures, + Collections.singletonList(new ElasticsearchException(new ElasticsearchStatusException("not failure", RestStatus.OK))) + ), + equalTo(RestStatus.BAD_REQUEST) + ); + + assertThat( + TransportStopTransformAction.firstNotOKStatus( + Collections.singletonList( + new TaskOperationFailure("node", 1, new ElasticsearchStatusException("not failure", RestStatus.OK)) + ), + nodeFailures + ), + equalTo(RestStatus.INTERNAL_SERVER_ERROR) + ); + + assertThat( + TransportStopTransformAction.firstNotOKStatus(Collections.emptyList(), nodeFailures), + equalTo(RestStatus.INTERNAL_SERVER_ERROR) + ); } public void testBuildException() { @@ -136,13 +144,16 @@ public void testBuildException() { List taskOperationFailures = new ArrayList<>(); nodeFailures.add(new ElasticsearchException("node failure")); - taskOperationFailures.add(new TaskOperationFailure("node", - 1, - new ElasticsearchStatusException("task failure", RestStatus.BAD_REQUEST))); + taskOperationFailures.add( + new TaskOperationFailure("node", 1, new ElasticsearchStatusException("task failure", RestStatus.BAD_REQUEST)) + ); RestStatus status = CONFLICT; - ElasticsearchStatusException statusException = - TransportStopTransformAction.buildException(taskOperationFailures, nodeFailures, status); + ElasticsearchStatusException statusException = TransportStopTransformAction.buildException( + taskOperationFailures, + nodeFailures, + status + ); assertThat(statusException.status(), equalTo(status)); assertThat(statusException.getMessage(), equalTo(taskOperationFailures.get(0).getCause().getMessage())); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index f34e1a2b78535..117bfc4d661da 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; @@ -41,8 +42,12 @@ import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import static org.hamcrest.Matchers.equalTo; @@ -52,114 +57,115 @@ public class TransformPersistentTasksExecutorTests extends ESTestCase { public void testNodeVersionAssignment() { - MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addIndices(metaData, routingTable); - PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder() - .addTask( - "transform-task-1", - TransformTaskParams.NAME, - new TransformTaskParams("transform-task-1", Version.CURRENT, null), - new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "") - ) - .addTask( - "transform-task-2", - TransformTaskParams.NAME, - new TransformTaskParams("transform-task-2", Version.CURRENT, null), - new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "") - ) - .addTask( - "transform-task-3", - TransformTaskParams.NAME, - new TransformTaskParams("transform-task-3", Version.CURRENT, null), - new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "") - ); + DiscoveryNodes.Builder nodes = buildNodes(false, true, true, true, true); + ClusterState cs = buildClusterState(nodes); + TransformPersistentTasksExecutor executor = buildTaskExecutor(); - PersistentTasksCustomMetaData pTasks = pTasksBuilder.build(); + assertThat( + executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, true), cs).getExecutorNode(), + equalTo("current-data-node-with-1-tasks") + ); + assertThat( + executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs).getExecutorNode(), + equalTo("current-data-node-with-0-tasks-transform-remote-disabled") + ); + assertThat( + executor.getAssignment(new TransformTaskParams("new-old-task-id", Version.V_7_5_0, null, true), cs).getExecutorNode(), + equalTo("past-data-node-1") + ); + } - metaData.putCustom(PersistentTasksCustomMetaData.TYPE, pTasks); + public void testNodeAssignmentProblems() { + // no data nodes + DiscoveryNodes.Builder nodes = buildNodes(false, false, false, false, true); + ClusterState cs = buildClusterState(nodes); + TransformPersistentTasksExecutor executor = buildTaskExecutor(); - DiscoveryNodes.Builder nodes = DiscoveryNodes.builder() - .add( - new DiscoveryNode( - "past-data-node-1", - buildNewFakeTransportAddress(), - Collections.emptyMap(), - Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE), - Version.V_7_2_0 - ) - ) - .add( - new DiscoveryNode( - "current-data-node-with-2-tasks", - buildNewFakeTransportAddress(), - Collections.emptyMap(), - Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE), - Version.CURRENT - ) - ) - .add( - new DiscoveryNode( - "non-data-node-1", - buildNewFakeTransportAddress(), - Collections.emptyMap(), - Set.of(DiscoveryNodeRole.MASTER_ROLE), - Version.CURRENT - ) - ) - .add( - new DiscoveryNode( - "current-data-node-with-1-tasks", - buildNewFakeTransportAddress(), - Collections.emptyMap(), - Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE), - Version.CURRENT - ) - ); + Assignment assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs); + assertNull(assignment.getExecutorNode()); + assertThat( + assignment.getExplanation(), + equalTo("Not starting transform [new-task-id], reasons [current-data-node-with-transform-disabled:not a transform node]") + ); - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).nodes(nodes); - csBuilder.routingTable(routingTable.build()); - csBuilder.metaData(metaData); + // dedicated transform node + nodes = buildNodes(true, false, false, false, true); + cs = buildClusterState(nodes); + executor = buildTaskExecutor(); - ClusterState cs = csBuilder.build(); - Client client = mock(Client.class); - TransformAuditor mockAuditor = mock(TransformAuditor.class); - IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(client, xContentRegistry()); - TransformCheckpointService transformCheckpointService = new TransformCheckpointService( - client, - Settings.EMPTY, - new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), - transformsConfigManager, - mockAuditor - ); - TransformServices transformServices = new TransformServices( - transformsConfigManager, - transformCheckpointService, - mockAuditor, - mock(SchedulerEngine.class) - ); + assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs); + assertNotNull(assignment.getExecutorNode()); + assertThat(assignment.getExecutorNode(), equalTo("dedicated-transform-node")); - ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY, Collections.singleton(Transform.NUM_FAILURE_RETRIES_SETTING)); - ClusterService clusterService = mock(ClusterService.class); - when(clusterService.getClusterSettings()).thenReturn(cSettings); - when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE); - TransformPersistentTasksExecutor executor = new TransformPersistentTasksExecutor( - client, - transformServices, - mock(ThreadPool.class), - clusterService, - Settings.EMPTY, - new IndexNameExpressionResolver() + // only an old node + nodes = buildNodes(false, true, false, false, true); + cs = buildClusterState(nodes); + executor = buildTaskExecutor(); + + assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_7_0, null, false), cs); + assertNull(assignment.getExecutorNode()); + assertThat( + assignment.getExplanation(), + equalTo( + "Not starting transform [new-task-id], reasons [" + + "current-data-node-with-transform-disabled:not a transform node" + + "|" + + "past-data-node-1:node has version: 7.5.0 but transform requires at least 7.7.0" + + "]" + ) ); + assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, false), cs); + assertNotNull(assignment.getExecutorNode()); + assertThat(assignment.getExecutorNode(), equalTo("past-data-node-1")); + + // no remote + nodes = buildNodes(false, false, false, true, false); + cs = buildClusterState(nodes); + executor = buildTaskExecutor(); + + assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true), cs); + assertNull(assignment.getExecutorNode()); assertThat( - executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null), cs).getExecutorNode(), - equalTo("current-data-node-with-1-tasks") + assignment.getExplanation(), + equalTo( + "Not starting transform [new-task-id], reasons [" + + "current-data-node-with-0-tasks-transform-remote-disabled:" + + "transform requires a remote connection but remote is disabled" + + "]" + ) ); + + assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.CURRENT, null, false), cs); + assertNotNull(assignment.getExecutorNode()); + assertThat(assignment.getExecutorNode(), equalTo("current-data-node-with-0-tasks-transform-remote-disabled")); + + // no remote and disabled + nodes = buildNodes(false, false, false, true, true); + cs = buildClusterState(nodes); + executor = buildTaskExecutor(); + + assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true), cs); + assertNull(assignment.getExecutorNode()); assertThat( - executor.getAssignment(new TransformTaskParams("new-old-task-id", Version.V_7_2_0, null), cs).getExecutorNode(), - equalTo("past-data-node-1") + assignment.getExplanation(), + equalTo( + "Not starting transform [new-task-id], reasons [" + + "current-data-node-with-0-tasks-transform-remote-disabled:" + + "transform requires a remote connection but remote is disabled" + + "|" + + "current-data-node-with-transform-disabled:not a transform node" + + "]" + ) ); + // old node, we do not know if remote is enabled + nodes = buildNodes(false, true, false, true, false); + cs = buildClusterState(nodes); + executor = buildTaskExecutor(); + + assignment = executor.getAssignment(new TransformTaskParams("new-task-id", Version.V_7_5_0, null, true), cs); + assertNotNull(assignment.getExecutorNode()); + assertThat(assignment.getExecutorNode(), equalTo("past-data-node-1")); } public void testVerifyIndicesPrimaryShardsAreActive() { @@ -172,8 +178,7 @@ public void testVerifyIndicesPrimaryShardsAreActive() { csBuilder.metaData(metaData); ClusterState cs = csBuilder.build(); - assertEquals(0, - TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs, new IndexNameExpressionResolver()).size()); + assertEquals(0, TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(cs, new IndexNameExpressionResolver()).size()); metaData = new MetaData.Builder(cs.metaData()); routingTable = new RoutingTable.Builder(cs.routingTable()); @@ -197,8 +202,10 @@ public void testVerifyIndicesPrimaryShardsAreActive() { csBuilder.routingTable(routingTable.build()); csBuilder.metaData(metaData); - List result = - TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive(csBuilder.build(), new IndexNameExpressionResolver()); + List result = TransformPersistentTasksExecutor.verifyIndicesPrimaryShardsAreActive( + csBuilder.build(), + new IndexNameExpressionResolver() + ); assertEquals(1, result.size()); assertEquals(indexToRemove, result.get(0)); } @@ -232,4 +239,163 @@ private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingT } } + private DiscoveryNodes.Builder buildNodes( + boolean dedicatedTransformNode, + boolean pastDataNode, + boolean transformRemoteNodes, + boolean transformLocanOnlyNodes, + boolean currentDataNode + ) { + + Map transformNodeAttributes = new HashMap<>(); + transformNodeAttributes.put(Transform.TRANSFORM_ENABLED_NODE_ATTR, "true"); + transformNodeAttributes.put(Transform.TRANSFORM_REMOTE_ENABLED_NODE_ATTR, "true"); + Map transformNodeAttributesDisabled = new HashMap<>(); + transformNodeAttributesDisabled.put(Transform.TRANSFORM_ENABLED_NODE_ATTR, "false"); + transformNodeAttributesDisabled.put(Transform.TRANSFORM_REMOTE_ENABLED_NODE_ATTR, "true"); + Map transformNodeAttributesNoRemote = new HashMap<>(); + transformNodeAttributesNoRemote.put(Transform.TRANSFORM_ENABLED_NODE_ATTR, "true"); + transformNodeAttributesNoRemote.put(Transform.TRANSFORM_REMOTE_ENABLED_NODE_ATTR, "false"); + + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); + + if (dedicatedTransformNode) { + nodes.add( + new DiscoveryNode( + "dedicated-transform-node", + buildNewFakeTransportAddress(), + transformNodeAttributes, + Collections.singleton(DiscoveryNodeRole.MASTER_ROLE), + Version.CURRENT + ) + ); + } + + if (pastDataNode) { + nodes.add( + new DiscoveryNode( + "past-data-node-1", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)), + Version.V_7_5_0 + ) + ); + } + + if (transformRemoteNodes) { + nodes.add( + new DiscoveryNode( + "current-data-node-with-2-tasks", + buildNewFakeTransportAddress(), + transformNodeAttributes, + new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE)), + Version.CURRENT + ) + ) + .add( + new DiscoveryNode( + "current-data-node-with-1-tasks", + buildNewFakeTransportAddress(), + transformNodeAttributes, + new HashSet<>(Arrays.asList(DiscoveryNodeRole.MASTER_ROLE)), + Version.CURRENT + ) + ); + } + + if (transformLocanOnlyNodes) { + nodes.add( + new DiscoveryNode( + "current-data-node-with-0-tasks-transform-remote-disabled", + buildNewFakeTransportAddress(), + transformNodeAttributesNoRemote, + new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)), + Version.CURRENT + ) + ); + } + + if (currentDataNode) { + nodes.add( + new DiscoveryNode( + "current-data-node-with-transform-disabled", + buildNewFakeTransportAddress(), + transformNodeAttributesDisabled, + Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE), + Version.CURRENT + ) + ); + } + + return nodes; + } + + private ClusterState buildClusterState(DiscoveryNodes.Builder nodes) { + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addIndices(metaData, routingTable); + PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder() + .addTask( + "transform-task-1", + TransformTaskParams.NAME, + new TransformTaskParams("transform-task-1", Version.CURRENT, null, false), + new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "") + ) + .addTask( + "transform-task-2", + TransformTaskParams.NAME, + new TransformTaskParams("transform-task-2", Version.CURRENT, null, false), + new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "") + ) + .addTask( + "transform-task-3", + TransformTaskParams.NAME, + new TransformTaskParams("transform-task-3", Version.CURRENT, null, false), + new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "") + ); + + PersistentTasksCustomMetaData pTasks = pTasksBuilder.build(); + metaData.putCustom(PersistentTasksCustomMetaData.TYPE, pTasks); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")).nodes(nodes); + csBuilder.routingTable(routingTable.build()); + csBuilder.metaData(metaData); + + return csBuilder.build(); + + } + + public TransformPersistentTasksExecutor buildTaskExecutor() { + Client client = mock(Client.class); + TransformAuditor mockAuditor = mock(TransformAuditor.class); + IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(client, xContentRegistry()); + TransformCheckpointService transformCheckpointService = new TransformCheckpointService( + client, + Settings.EMPTY, + new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), + transformsConfigManager, + mockAuditor + ); + TransformServices transformServices = new TransformServices( + transformsConfigManager, + transformCheckpointService, + mockAuditor, + mock(SchedulerEngine.class) + ); + + ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY, Collections.singleton(Transform.NUM_FAILURE_RETRIES_SETTING)); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(cSettings); + when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE); + + return new TransformPersistentTasksExecutor( + client, + transformServices, + mock(ThreadPool.class), + clusterService, + Settings.EMPTY, + new IndexNameExpressionResolver() + ); + } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index c2bd7352f66e6..418df16602899 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -96,7 +96,7 @@ public void testStopOnFailedTaskWithStoppedIndexer() { "some_type", "some_action", TaskId.EMPTY_TASK_ID, - new TransformTaskParams(transformConfig.getId(), Version.CURRENT, TimeValue.timeValueSeconds(10)), + new TransformTaskParams(transformConfig.getId(), Version.CURRENT, TimeValue.timeValueSeconds(10), false), transformState, mock(SchedulerEngine.class), auditor, @@ -176,7 +176,7 @@ public void testStopOnFailedTaskWithoutIndexer() { "some_type", "some_action", TaskId.EMPTY_TASK_ID, - new TransformTaskParams(transformConfig.getId(), Version.CURRENT, TimeValue.timeValueSeconds(10)), + new TransformTaskParams(transformConfig.getId(), Version.CURRENT, TimeValue.timeValueSeconds(10), false), transformState, mock(SchedulerEngine.class), auditor,