diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/IndexLifecycleNamedXContentProvider.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/IndexLifecycleNamedXContentProvider.java index a4e5f034b5154..90ef9d808997e 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/IndexLifecycleNamedXContentProvider.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/IndexLifecycleNamedXContentProvider.java @@ -56,7 +56,10 @@ public List getNamedXContentParsers() { FreezeAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), - SetPriorityAction::parse) + SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, + new ParseField(UnfollowAction.NAME), + UnfollowAction::parse) ); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicy.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicy.java index 1a0f80b740ee7..5e4ae1f36bcbc 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicy.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicy.java @@ -57,10 +57,10 @@ public class LifecyclePolicy implements ToXContentObject { throw new IllegalArgumentException("ordered " + PHASES_FIELD.getPreferredName() + " are not supported"); }, PHASES_FIELD); - ALLOWED_ACTIONS.put("hot", Sets.newHashSet(SetPriorityAction.NAME, RolloverAction.NAME)); - ALLOWED_ACTIONS.put("warm", Sets.newHashSet(SetPriorityAction.NAME, AllocateAction.NAME, ForceMergeAction.NAME, + ALLOWED_ACTIONS.put("hot", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, RolloverAction.NAME)); + ALLOWED_ACTIONS.put("warm", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME, ForceMergeAction.NAME, ReadOnlyAction.NAME, ShrinkAction.NAME)); - ALLOWED_ACTIONS.put("cold", Sets.newHashSet(SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME)); + ALLOWED_ACTIONS.put("cold", Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME)); ALLOWED_ACTIONS.put("delete", Sets.newHashSet(DeleteAction.NAME)); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/UnfollowAction.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/UnfollowAction.java new file mode 100644 index 0000000000000..ba25cf937ec8f --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/indexlifecycle/UnfollowAction.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.indexlifecycle; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; + +public class UnfollowAction implements LifecycleAction, ToXContentObject { + public static final String NAME = "unfollow"; + + private static final ObjectParser PARSER = new ObjectParser<>(NAME, UnfollowAction::new); + + public UnfollowAction() {} + + @Override + public String getName() { + return NAME; + } + + public static UnfollowAction parse(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return 36970; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + return true; + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndexLifecycleIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndexLifecycleIT.java index 08ec5a5b3fe09..4ad6d2e6ce604 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndexLifecycleIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndexLifecycleIT.java @@ -48,6 +48,7 @@ import org.elasticsearch.client.indexlifecycle.ShrinkAction; import org.elasticsearch.client.indexlifecycle.StartILMRequest; import org.elasticsearch.client.indexlifecycle.StopILMRequest; +import org.elasticsearch.client.indexlifecycle.UnfollowAction; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.hamcrest.Matchers; @@ -144,19 +145,20 @@ public void testStartStopILM() throws Exception { public void testExplainLifecycle() throws Exception { Map lifecyclePhases = new HashMap<>(); - Map hotActions = Collections.singletonMap( - RolloverAction.NAME, - new RolloverAction(null, TimeValue.timeValueHours(50 * 24), null)); + Map hotActions = new HashMap<>(); + hotActions.put(RolloverAction.NAME, new RolloverAction(null, TimeValue.timeValueHours(50 * 24), null)); Phase hotPhase = new Phase("hot", randomFrom(TimeValue.ZERO, null), hotActions); lifecyclePhases.put("hot", hotPhase); Map warmActions = new HashMap<>(); + warmActions.put(UnfollowAction.NAME, new UnfollowAction()); warmActions.put(AllocateAction.NAME, new AllocateAction(null, null, null, Collections.singletonMap("_name", "node-1"))); warmActions.put(ShrinkAction.NAME, new ShrinkAction(1)); warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1000)); lifecyclePhases.put("warm", new Phase("warm", TimeValue.timeValueSeconds(1000), warmActions)); Map coldActions = new HashMap<>(); + coldActions.put(UnfollowAction.NAME, new UnfollowAction()); coldActions.put(AllocateAction.NAME, new AllocateAction(0, null, null, null)); lifecyclePhases.put("cold", new Phase("cold", TimeValue.timeValueSeconds(2000), coldActions)); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 6995fcf099ad2..1ea6056368051 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -56,6 +56,7 @@ import org.elasticsearch.client.indexlifecycle.RolloverAction; import org.elasticsearch.client.indexlifecycle.SetPriorityAction; import org.elasticsearch.client.indexlifecycle.ShrinkAction; +import org.elasticsearch.client.indexlifecycle.UnfollowAction; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.bytes.BytesReference; @@ -645,7 +646,7 @@ public void testDefaultNamedXContents() { public void testProvidedNamedXContents() { List namedXContents = RestHighLevelClient.getProvidedNamedXContents(); - assertEquals(19, namedXContents.size()); + assertEquals(20, namedXContents.size()); Map, Integer> categories = new HashMap<>(); List names = new ArrayList<>(); for (NamedXContentRegistry.Entry namedXContent : namedXContents) { @@ -669,7 +670,8 @@ public void testProvidedNamedXContents() { assertTrue(names.contains(MeanReciprocalRank.NAME)); assertTrue(names.contains(DiscountedCumulativeGain.NAME)); assertTrue(names.contains(ExpectedReciprocalRank.NAME)); - assertEquals(Integer.valueOf(8), categories.get(LifecycleAction.class)); + assertEquals(Integer.valueOf(9), categories.get(LifecycleAction.class)); + assertTrue(names.contains(UnfollowAction.NAME)); assertTrue(names.contains(AllocateAction.NAME)); assertTrue(names.contains(DeleteAction.NAME)); assertTrue(names.contains(ForceMergeAction.NAME)); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/GetLifecyclePolicyResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/GetLifecyclePolicyResponseTests.java index 0fb7b29067f22..c16c270512ca6 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/GetLifecyclePolicyResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/GetLifecyclePolicyResponseTests.java @@ -68,7 +68,8 @@ protected NamedXContentRegistry xContentRegistry() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse) + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse) )); return new NamedXContentRegistry(entries); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicyMetadataTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicyMetadataTests.java index 25bfa5a4c43d2..6d8014c432c28 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicyMetadataTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicyMetadataTests.java @@ -64,7 +64,8 @@ protected NamedXContentRegistry xContentRegistry() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse) + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse) )); return new NamedXContentRegistry(entries); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicyTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicyTests.java index 4f04f814471c1..1690f66572142 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicyTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/LifecyclePolicyTests.java @@ -39,10 +39,11 @@ import static org.hamcrest.Matchers.equalTo; public class LifecyclePolicyTests extends AbstractXContentTestCase { - private static final Set VALID_HOT_ACTIONS = Sets.newHashSet(SetPriorityAction.NAME, RolloverAction.NAME); - private static final Set VALID_WARM_ACTIONS = Sets.newHashSet(SetPriorityAction.NAME, AllocateAction.NAME, + private static final Set VALID_HOT_ACTIONS = Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, RolloverAction.NAME); + private static final Set VALID_WARM_ACTIONS = Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME, ForceMergeAction.NAME, ReadOnlyAction.NAME, ShrinkAction.NAME); - private static final Set VALID_COLD_ACTIONS = Sets.newHashSet(SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME); + private static final Set VALID_COLD_ACTIONS = Sets.newHashSet(UnfollowAction.NAME, SetPriorityAction.NAME, AllocateAction.NAME, + FreezeAction.NAME); private static final Set VALID_DELETE_ACTIONS = Sets.newHashSet(DeleteAction.NAME); private String lifecycleName; @@ -68,7 +69,8 @@ protected NamedXContentRegistry xContentRegistry() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse) + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse) )); return new NamedXContentRegistry(entries); } @@ -213,6 +215,8 @@ public static LifecyclePolicy createRandomPolicy(String lifecycleName) { return new FreezeAction(); case SetPriorityAction.NAME: return SetPriorityActionTests.randomInstance(); + case UnfollowAction.NAME: + return new UnfollowAction(); default: throw new IllegalArgumentException("invalid action [" + action + "]"); }}; @@ -246,6 +250,8 @@ private LifecycleAction getTestAction(String actionName) { return new FreezeAction(); case SetPriorityAction.NAME: return SetPriorityActionTests.randomInstance(); + case UnfollowAction.NAME: + return new UnfollowAction(); default: throw new IllegalArgumentException("unsupported phase action [" + actionName + "]"); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/UnfollowActionTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/UnfollowActionTests.java new file mode 100644 index 0000000000000..4dd73c5a08ec2 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/indexlifecycle/UnfollowActionTests.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.indexlifecycle; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class UnfollowActionTests extends AbstractXContentTestCase { + + @Override + protected UnfollowAction createTestInstance() { + return new UnfollowAction(); + } + + @Override + protected UnfollowAction doParseInstance(XContentParser parser) throws IOException { + return UnfollowAction.parse(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +} diff --git a/docs/reference/ilm/policy-definitions.asciidoc b/docs/reference/ilm/policy-definitions.asciidoc index 2f71c20e2c76a..5ccbe58b71730 100644 --- a/docs/reference/ilm/policy-definitions.asciidoc +++ b/docs/reference/ilm/policy-definitions.asciidoc @@ -87,16 +87,19 @@ The below list shows the actions which are available in each phase. * Hot - <> - <> + - <> * Warm - <> - <> - <> - <> - <> + - <> * Cold - <> - <> - <> + - <> * Delete - <> @@ -616,6 +619,43 @@ PUT _ilm/policy/my_policy -------------------------------------------------- // CONSOLE +[[ilm-unfollow-action]] +==== Unfollow + +This action turns a {ref}/ccr-apis.html[ccr] follower index +into a regular index. This can be desired when moving follower +indices into the next phase. Also certain actions like shrink +and rollover can then be performed safely on follower indices. + +If the unfollow action encounters a follower index then +the following operations will be performed on it: + +* Pauses indexing following for the follower index. +* Closes the follower index. +* Unfollows the follower index. +* Opens the follower index (which is at this point is a regular index). + +The unfollow action does not have any options and +if it encounters a non follower index, then the +unfollow action leaves that index untouched and +lets the next action operate on this index. + +[source,js] +-------------------------------------------------- +PUT _ilm/policy/my_policy +{ + "policy": { + "phases": { + "hot": { + "actions": { + "unfollow" : {} + } + } + } + } +} +-------------------------------------------------- +// CONSOLE === Full Policy diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 6865cd58c0dac..a121217d4cdaa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -57,6 +57,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction; import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction; import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction; import org.elasticsearch.xpack.core.indexlifecycle.action.DeleteLifecycleAction; import org.elasticsearch.xpack.core.indexlifecycle.action.ExplainLifecycleAction; import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction; @@ -429,7 +430,8 @@ public List getNamedWriteables() { new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new), - new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new) + new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new), + new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new) ); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStep.java new file mode 100644 index 0000000000000..8e0626425b490 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStep.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; + +import java.util.Map; + +import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY; + +abstract class AbstractUnfollowIndexStep extends AsyncActionStep { + + AbstractUnfollowIndexStep(StepKey key, StepKey nextStepKey, Client client) { + super(key, nextStepKey, client); + } + + @Override + public final void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) { + String followerIndex = indexMetaData.getIndex().getName(); + Map customIndexMetadata = indexMetaData.getCustomData(CCR_METADATA_KEY); + if (customIndexMetadata == null) { + listener.onResponse(true); + return; + } + + innerPerformAction(followerIndex, listener); + } + + abstract void innerPerformAction(String followerIndex, Listener listener); +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStep.java new file mode 100644 index 0000000000000..3fb6e145236bc --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStep.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.client.Client; + +final class CloseFollowerIndexStep extends AbstractUnfollowIndexStep { + + static final String NAME = "close-follower-index"; + + CloseFollowerIndexStep(StepKey key, StepKey nextStepKey, Client client) { + super(key, nextStepKey, client); + } + + @Override + void innerPerformAction(String followerIndex, Listener listener) { + CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex); + getClient().admin().indices().close(closeIndexRequest, ActionListener.wrap( + r -> { + assert r.isAcknowledged() : "close index response is not acknowledged"; + listener.onResponse(true); + }, + listener::onFailure) + ); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStep.java new file mode 100644 index 0000000000000..7ba2c4633ab99 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStep.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; + +final class OpenFollowerIndexStep extends AsyncActionStep { + + static final String NAME = "open-follower-index"; + + OpenFollowerIndexStep(StepKey key, StepKey nextStepKey, Client client) { + super(key, nextStepKey, client); + } + + @Override + public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) { + OpenIndexRequest request = new OpenIndexRequest(indexMetaData.getIndex().getName()); + getClient().admin().indices().open(request, ActionListener.wrap( + r -> { + assert r.isAcknowledged() : "open index response is not acknowledged"; + listener.onResponse(true); + }, + listener::onFailure + )); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PauseFollowerIndexStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PauseFollowerIndexStep.java new file mode 100644 index 0000000000000..72b38c7b72797 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/PauseFollowerIndexStep.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; + +final class PauseFollowerIndexStep extends AbstractUnfollowIndexStep { + + static final String NAME = "pause-follower-index"; + + PauseFollowerIndexStep(StepKey key, StepKey nextStepKey, Client client) { + super(key, nextStepKey, client); + } + + @Override + void innerPerformAction(String followerIndex, Listener listener) { + PauseFollowAction.Request request = new PauseFollowAction.Request(followerIndex); + getClient().execute(PauseFollowAction.INSTANCE, request, ActionListener.wrap( + r -> { + assert r.isAcknowledged() : "pause follow response is not acknowledged"; + listener.onResponse(true); + }, + listener::onFailure + )); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleType.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleType.java index 5dad5725ba9ba..4d1c770cea4bc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleType.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleType.java @@ -34,10 +34,11 @@ public class TimeseriesLifecycleType implements LifecycleType { public static final String TYPE = "timeseries"; static final List VALID_PHASES = Arrays.asList("hot", "warm", "cold", "delete"); - static final List ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, RolloverAction.NAME); - static final List ORDERED_VALID_WARM_ACTIONS = Arrays.asList(SetPriorityAction.NAME, ReadOnlyAction.NAME, AllocateAction.NAME, - ShrinkAction.NAME, ForceMergeAction.NAME); - static final List ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME); + static final List ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME); + static final List ORDERED_VALID_WARM_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME, + AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME); + static final List ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME, + FreezeAction.NAME); static final List ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(DeleteAction.NAME); static final Set VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS); static final Set VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowAction.java new file mode 100644 index 0000000000000..20a0fb75b9daa --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowAction.java @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +/** + * Converts a CCR following index into a normal, standalone index, once the index is ready to be safely separated. + * + * "Readiness" is composed of two conditions: + * 1) The index must have {@link LifecycleSettings#LIFECYCLE_INDEXING_COMPLETE} set to {@code true}, which is + * done automatically by {@link RolloverAction} (or manually). + * 2) The index must be up to date with the leader, defined as the follower checkpoint being + * equal to the global checkpoint for all shards. + */ +public final class UnfollowAction implements LifecycleAction { + + public static final String NAME = "unfollow"; + public static final String CCR_METADATA_KEY = "ccr"; + + public UnfollowAction() {} + + @Override + public List toSteps(Client client, String phase, StepKey nextStepKey) { + StepKey indexingComplete = new StepKey(phase, NAME, WaitForIndexingCompleteStep.NAME); + StepKey waitForFollowShardTasks = new StepKey(phase, NAME, WaitForFollowShardTasksStep.NAME); + StepKey pauseFollowerIndex = new StepKey(phase, NAME, PauseFollowerIndexStep.NAME); + StepKey closeFollowerIndex = new StepKey(phase, NAME, CloseFollowerIndexStep.NAME); + StepKey unfollowFollowerIndex = new StepKey(phase, NAME, UnfollowFollowIndexStep.NAME); + StepKey openFollowerIndex = new StepKey(phase, NAME, OpenFollowerIndexStep.NAME); + StepKey waitForYellowStep = new StepKey(phase, NAME, WaitForYellowStep.NAME); + + WaitForIndexingCompleteStep step1 = new WaitForIndexingCompleteStep(indexingComplete, waitForFollowShardTasks); + WaitForFollowShardTasksStep step2 = new WaitForFollowShardTasksStep(waitForFollowShardTasks, pauseFollowerIndex, client); + PauseFollowerIndexStep step3 = new PauseFollowerIndexStep(pauseFollowerIndex, closeFollowerIndex, client); + CloseFollowerIndexStep step4 = new CloseFollowerIndexStep(closeFollowerIndex, unfollowFollowerIndex, client); + UnfollowFollowIndexStep step5 = new UnfollowFollowIndexStep(unfollowFollowerIndex, openFollowerIndex, client); + OpenFollowerIndexStep step6 = new OpenFollowerIndexStep(openFollowerIndex, waitForYellowStep, client); + WaitForYellowStep step7 = new WaitForYellowStep(waitForYellowStep, nextStepKey); + return Arrays.asList(step1, step2, step3, step4, step5, step6, step7); + } + + @Override + public List toStepKeys(String phase) { + StepKey indexingCompleteStep = new StepKey(phase, NAME, WaitForIndexingCompleteStep.NAME); + StepKey waitForFollowShardTasksStep = new StepKey(phase, NAME, WaitForFollowShardTasksStep.NAME); + StepKey pauseFollowerIndexStep = new StepKey(phase, NAME, PauseFollowerIndexStep.NAME); + StepKey closeFollowerIndexStep = new StepKey(phase, NAME, CloseFollowerIndexStep.NAME); + StepKey unfollowIndexStep = new StepKey(phase, NAME, UnfollowFollowIndexStep.NAME); + StepKey openFollowerIndexStep = new StepKey(phase, NAME, OpenFollowerIndexStep.NAME); + StepKey waitForYellowStep = new StepKey(phase, NAME, WaitForYellowStep.NAME); + return Arrays.asList(indexingCompleteStep, waitForFollowShardTasksStep, pauseFollowerIndexStep, + closeFollowerIndexStep, unfollowIndexStep, openFollowerIndexStep, waitForYellowStep); + } + + @Override + public boolean isSafeAction() { + // There are no settings to change, so therefor this action should be safe: + return true; + } + + @Override + public String getWriteableName() { + return NAME; + } + + public UnfollowAction(StreamInput in) throws IOException {} + + @Override + public void writeTo(StreamOutput out) throws IOException {} + + private static final ObjectParser PARSER = new ObjectParser<>(NAME, UnfollowAction::new); + + public static UnfollowAction parse(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return 36970; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (obj.getClass() != getClass()) { + return false; + } + return true; + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowFollowIndexStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowFollowIndexStep.java new file mode 100644 index 0000000000000..953450bbc763b --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowFollowIndexStep.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; + +final class UnfollowFollowIndexStep extends AbstractUnfollowIndexStep { + + static final String NAME = "unfollow-follower-index"; + + UnfollowFollowIndexStep(StepKey key, StepKey nextStepKey, Client client) { + super(key, nextStepKey, client); + } + + @Override + void innerPerformAction(String followerIndex, Listener listener) { + UnfollowAction.Request request = new UnfollowAction.Request(followerIndex); + getClient().execute(UnfollowAction.INSTANCE, request, ActionListener.wrap( + r -> { + assert r.isAcknowledged() : "unfollow response is not acknowledged"; + listener.onResponse(true); + }, + listener::onFailure + )); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStep.java new file mode 100644 index 0000000000000..f3938a1d3da2b --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStep.java @@ -0,0 +1,181 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY; + +final class WaitForFollowShardTasksStep extends AsyncWaitStep { + + static final String NAME = "wait-for-follow-shard-tasks"; + + WaitForFollowShardTasksStep(StepKey key, StepKey nextStepKey, Client client) { + super(key, nextStepKey, client); + } + + @Override + public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) { + Map customIndexMetadata = indexMetaData.getCustomData(CCR_METADATA_KEY); + if (customIndexMetadata == null) { + listener.onResponse(true, null); + return; + } + + FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest(); + request.setIndices(new String[]{indexMetaData.getIndex().getName()}); + getClient().execute(FollowStatsAction.INSTANCE, request, + ActionListener.wrap(r -> handleResponse(r, listener), listener::onFailure)); + } + + void handleResponse(FollowStatsAction.StatsResponses responses, Listener listener) { + List unSyncedShardFollowStatuses = responses.getStatsResponses() + .stream() + .map(FollowStatsAction.StatsResponse::status) + .filter(shardFollowStatus -> shardFollowStatus.leaderGlobalCheckpoint() != shardFollowStatus.followerGlobalCheckpoint()) + .collect(Collectors.toList()); + + // Follow stats api needs to return stats for follower index and all shard follow tasks should be synced: + boolean conditionMet = responses.getStatsResponses().size() > 0 && unSyncedShardFollowStatuses.isEmpty(); + if (conditionMet) { + listener.onResponse(true, null); + } else { + List shardFollowTaskInfos = unSyncedShardFollowStatuses + .stream() + .map(status -> new Info.ShardFollowTaskInfo(status.followerIndex(), status.getShardId(), + status.leaderGlobalCheckpoint(), status.followerGlobalCheckpoint())) + .collect(Collectors.toList()); + listener.onResponse(false, new Info(shardFollowTaskInfos)); + } + } + + static final class Info implements ToXContentObject { + + static final ParseField SHARD_FOLLOW_TASKS = new ParseField("shard_follow_tasks"); + static final ParseField MESSAGE = new ParseField("message"); + + private final List shardFollowTaskInfos; + + Info(List shardFollowTaskInfos) { + this.shardFollowTaskInfos = shardFollowTaskInfos; + } + + List getShardFollowTaskInfos() { + return shardFollowTaskInfos; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(SHARD_FOLLOW_TASKS.getPreferredName(), shardFollowTaskInfos); + String message; + if (shardFollowTaskInfos.size() > 0) { + message = "Waiting for [" + shardFollowTaskInfos.size() + "] shard follow tasks to be in sync"; + } else { + message = "Waiting for following to be unpaused and all shard follow tasks to be up to date"; + } + builder.field(MESSAGE.getPreferredName(), message); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Info info = (Info) o; + return Objects.equals(shardFollowTaskInfos, info.shardFollowTaskInfos); + } + + @Override + public int hashCode() { + return Objects.hash(shardFollowTaskInfos); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + static final class ShardFollowTaskInfo implements ToXContentObject { + + static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index"); + static final ParseField SHARD_ID_FIELD = new ParseField("shard_id"); + static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint"); + static final ParseField FOLLOWER_GLOBAL_CHECKPOINT_FIELD = new ParseField("follower_global_checkpoint"); + + private final String followerIndex; + private final int shardId; + private final long leaderGlobalCheckpoint; + private final long followerGlobalCheckpoint; + + ShardFollowTaskInfo(String followerIndex, int shardId, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) { + this.followerIndex = followerIndex; + this.shardId = shardId; + this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; + this.followerGlobalCheckpoint = followerGlobalCheckpoint; + } + + String getFollowerIndex() { + return followerIndex; + } + + + int getShardId() { + return shardId; + } + + long getLeaderGlobalCheckpoint() { + return leaderGlobalCheckpoint; + } + + long getFollowerGlobalCheckpoint() { + return followerGlobalCheckpoint; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex); + builder.field(SHARD_ID_FIELD.getPreferredName(), shardId); + builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint); + builder.field(FOLLOWER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), followerGlobalCheckpoint); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ShardFollowTaskInfo that = (ShardFollowTaskInfo) o; + return shardId == that.shardId && + leaderGlobalCheckpoint == that.leaderGlobalCheckpoint && + followerGlobalCheckpoint == that.followerGlobalCheckpoint && + Objects.equals(followerIndex, that.followerIndex); + } + + @Override + public int hashCode() { + return Objects.hash(followerIndex, shardId, leaderGlobalCheckpoint, followerGlobalCheckpoint); + } + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForIndexingCompleteStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForIndexingCompleteStep.java new file mode 100644 index 0000000000000..3f795a88dd85b --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForIndexingCompleteStep.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY; + +final class WaitForIndexingCompleteStep extends ClusterStateWaitStep { + private static final Logger logger = LogManager.getLogger(WaitForIndexingCompleteStep.class); + + static final String NAME = "wait-for-indexing-complete"; + + WaitForIndexingCompleteStep(StepKey key, StepKey nextStepKey) { + super(key, nextStepKey); + } + + @Override + public Result isConditionMet(Index index, ClusterState clusterState) { + IndexMetaData followerIndex = clusterState.metaData().index(index); + if (followerIndex == null) { + // Index must have been since deleted, ignore it + logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().getAction(), index.getName()); + return new Result(false, null); + } + Map customIndexMetadata = followerIndex.getCustomData(CCR_METADATA_KEY); + if (customIndexMetadata == null) { + return new Result(true, null); + } + + boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(followerIndex.getSettings()); + if (indexingComplete) { + return new Result(true, null); + } else { + return new Result(false, new IndexingNotCompleteInfo()); + } + } + + static final class IndexingNotCompleteInfo implements ToXContentObject { + + static final ParseField MESSAGE_FIELD = new ParseField("message"); + static final ParseField INDEXING_COMPLETE = new ParseField(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE); + + private final String message; + + IndexingNotCompleteInfo() { + this.message = "waiting for the [" + LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE + + "] setting to be set to true on the leader index, it is currently [false]"; + } + + String getMessage() { + return message; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(MESSAGE_FIELD.getPreferredName(), message); + builder.field(INDEXING_COMPLETE.getPreferredName(), false); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IndexingNotCompleteInfo info = (IndexingNotCompleteInfo) o; + return Objects.equals(getMessage(), info.getMessage()); + } + + @Override + public int hashCode() { + return Objects.hash(getMessage()); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForYellowStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForYellowStep.java new file mode 100644 index 0000000000000..75be80199e9b2 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForYellowStep.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; + +import java.io.IOException; +import java.util.Objects; + +class WaitForYellowStep extends ClusterStateWaitStep { + + static final String NAME = "wait-for-yellow-step"; + + WaitForYellowStep(StepKey key, StepKey nextStepKey) { + super(key, nextStepKey); + } + + @Override + public Result isConditionMet(Index index, ClusterState clusterState) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexShardRoutingTable = routingTable.index(index); + if (indexShardRoutingTable == null) { + return new Result(false, new Info("index is red; no IndexRoutingTable")); + } + + boolean indexIsAtLeastYellow = indexShardRoutingTable.allPrimaryShardsActive(); + if (indexIsAtLeastYellow) { + return new Result(true, null); + } else { + return new Result(false, new Info("index is red; not all primary shards are active")); + } + } + + static final class Info implements ToXContentObject { + + static final ParseField MESSAGE_FIELD = new ParseField("message"); + + private final String message; + + Info(String message) { + this.message = message; + } + + String getMessage() { + return message; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(MESSAGE_FIELD.getPreferredName(), message); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Info info = (Info) o; + return Objects.equals(getMessage(), info.getMessage()); + } + + @Override + public int hashCode() { + return Objects.hash(getMessage()); + } + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStepTestCase.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStepTestCase.java new file mode 100644 index 0000000000000..5ceb8ca657006 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStepTestCase.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.mockito.Mockito; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public abstract class AbstractUnfollowIndexStepTestCase extends AbstractStepTestCase { + + @Override + protected final T createRandomInstance() { + Step.StepKey stepKey = randomStepKey(); + Step.StepKey nextStepKey = randomStepKey(); + return newInstance(stepKey, nextStepKey, Mockito.mock(Client.class)); + } + + @Override + protected final T mutateInstance(T instance) { + Step.StepKey key = instance.getKey(); + Step.StepKey nextKey = instance.getNextStepKey(); + + if (randomBoolean()) { + key = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } else { + nextKey = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } + + return newInstance(key, nextKey, instance.getClient()); + } + + @Override + protected final T copyInstance(T instance) { + return newInstance(instance.getKey(), instance.getNextStepKey(), instance.getClient()); + } + + public final void testNotAFollowerIndex() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + Client client = Mockito.mock(Client.class); + T step = newInstance(randomStepKey(), randomStepKey(), client); + + Boolean[] completed = new Boolean[1]; + Exception[] failure = new Exception[1]; + step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + @Override + public void onResponse(boolean complete) { + completed[0] = complete; + } + + @Override + public void onFailure(Exception e) { + failure[0] = e; + } + }); + assertThat(completed[0], is(true)); + assertThat(failure[0], nullValue()); + Mockito.verifyZeroInteractions(client); + } + + protected abstract T newInstance(Step.StepKey key, Step.StepKey nextKey, Client client); +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java new file mode 100644 index 0000000000000..528021189e107 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.mockito.Mockito; + +import java.util.Collections; + +import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class CloseFollowerIndexStepTests extends AbstractUnfollowIndexStepTestCase { + + @Override + protected CloseFollowerIndexStep newInstance(Step.StepKey key, Step.StepKey nextKey, Client client) { + return new CloseFollowerIndexStep(key, nextKey, client); + } + + public void testCloseFollowingIndex() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Mockito.doAnswer(invocation -> { + CloseIndexRequest closeIndexRequest = (CloseIndexRequest) invocation.getArguments()[0]; + assertThat(closeIndexRequest.indices()[0], equalTo("follower-index")); + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + listener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(indicesClient).close(Mockito.any(), Mockito.any()); + + Boolean[] completed = new Boolean[1]; + Exception[] failure = new Exception[1]; + CloseFollowerIndexStep step = new CloseFollowerIndexStep(randomStepKey(), randomStepKey(), client); + step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + @Override + public void onResponse(boolean complete) { + completed[0] = complete; + } + + @Override + public void onFailure(Exception e) { + failure[0] = e; + } + }); + assertThat(completed[0], is(true)); + assertThat(failure[0], nullValue()); + } + + public void testCloseFollowingIndexFailed() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + // Mock pause follow api call: + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Exception error = new RuntimeException(); + Mockito.doAnswer(invocation -> { + CloseIndexRequest closeIndexRequest = (CloseIndexRequest) invocation.getArguments()[0]; + assertThat(closeIndexRequest.indices()[0], equalTo("follower-index")); + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + listener.onFailure(error); + return null; + }).when(indicesClient).close(Mockito.any(), Mockito.any()); + + Boolean[] completed = new Boolean[1]; + Exception[] failure = new Exception[1]; + CloseFollowerIndexStep step = new CloseFollowerIndexStep(randomStepKey(), randomStepKey(), client); + step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + @Override + public void onResponse(boolean complete) { + completed[0] = complete; + } + + @Override + public void onFailure(Exception e) { + failure[0] = e; + } + }); + assertThat(completed[0], nullValue()); + assertThat(failure[0], sameInstance(error)); + Mockito.verify(indicesClient).close(Mockito.any(), Mockito.any()); + Mockito.verifyNoMoreInteractions(indicesClient); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyMetadataTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyMetadataTests.java index d943f7ea65308..fcca1cf01c0dd 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyMetadataTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyMetadataTests.java @@ -45,7 +45,8 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new), - new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new) + new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new), + new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new) )); } @@ -62,7 +63,8 @@ protected NamedXContentRegistry xContentRegistry() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse) + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse) )); return new NamedXContentRegistry(entries); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyTests.java index 34e09824ed4b4..1730213e68363 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyTests.java @@ -54,7 +54,8 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new), - new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new) + new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new), + new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new) )); } @@ -71,7 +72,8 @@ protected NamedXContentRegistry xContentRegistry() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse) + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse) )); return new NamedXContentRegistry(entries); } @@ -120,6 +122,8 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicyWithAllPhases(@Null return new FreezeAction(); case SetPriorityAction.NAME: return SetPriorityActionTests.randomInstance(); + case UnfollowAction.NAME: + return new UnfollowAction(); default: throw new IllegalArgumentException("invalid action [" + action + "]"); }}; @@ -170,6 +174,8 @@ public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String l return new FreezeAction(); case SetPriorityAction.NAME: return SetPriorityActionTests.randomInstance(); + case UnfollowAction.NAME: + return new UnfollowAction(); default: throw new IllegalArgumentException("invalid action [" + action + "]"); }}; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStepTests.java new file mode 100644 index 0000000000000..2d5086ec88fac --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStepTests.java @@ -0,0 +1,137 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.mockito.Mockito; + +import java.util.Collections; + +import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class OpenFollowerIndexStepTests extends AbstractStepTestCase { + + @Override + protected OpenFollowerIndexStep createRandomInstance() { + Step.StepKey stepKey = randomStepKey(); + Step.StepKey nextStepKey = randomStepKey(); + return new OpenFollowerIndexStep(stepKey, nextStepKey, Mockito.mock(Client.class)); + } + + @Override + protected OpenFollowerIndexStep mutateInstance(OpenFollowerIndexStep instance) { + Step.StepKey key = instance.getKey(); + Step.StepKey nextKey = instance.getNextStepKey(); + + if (randomBoolean()) { + key = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } else { + nextKey = new Step.StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } + + return new OpenFollowerIndexStep(key, nextKey, instance.getClient()); + } + + @Override + protected OpenFollowerIndexStep copyInstance(OpenFollowerIndexStep instance) { + return new OpenFollowerIndexStep(instance.getKey(), instance.getNextStepKey(), instance.getClient()); + } + + public void testOpenFollowingIndex() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Mockito.doAnswer(invocation -> { + OpenIndexRequest closeIndexRequest = (OpenIndexRequest) invocation.getArguments()[0]; + assertThat(closeIndexRequest.indices()[0], equalTo("follower-index")); + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + listener.onResponse(new OpenIndexResponse(true, true)); + return null; + }).when(indicesClient).open(Mockito.any(), Mockito.any()); + + Boolean[] completed = new Boolean[1]; + Exception[] failure = new Exception[1]; + OpenFollowerIndexStep step = new OpenFollowerIndexStep(randomStepKey(), randomStepKey(), client); + step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + @Override + public void onResponse(boolean complete) { + completed[0] = complete; + } + + @Override + public void onFailure(Exception e) { + failure[0] = e; + } + }); + assertThat(completed[0], is(true)); + assertThat(failure[0], nullValue()); + } + + public void testOpenFollowingIndexFailed() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Exception error = new RuntimeException(); + Mockito.doAnswer(invocation -> { + OpenIndexRequest closeIndexRequest = (OpenIndexRequest) invocation.getArguments()[0]; + assertThat(closeIndexRequest.indices()[0], equalTo("follower-index")); + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + listener.onFailure(error); + return null; + }).when(indicesClient).open(Mockito.any(), Mockito.any()); + + Boolean[] completed = new Boolean[1]; + Exception[] failure = new Exception[1]; + OpenFollowerIndexStep step = new OpenFollowerIndexStep(randomStepKey(), randomStepKey(), client); + step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + @Override + public void onResponse(boolean complete) { + completed[0] = complete; + } + + @Override + public void onFailure(Exception e) { + failure[0] = e; + } + }); + assertThat(completed[0], nullValue()); + assertThat(failure[0], sameInstance(error)); + Mockito.verify(indicesClient).open(Mockito.any(), Mockito.any()); + Mockito.verifyNoMoreInteractions(indicesClient); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/PauseFollowerIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/PauseFollowerIndexStepTests.java new file mode 100644 index 0000000000000..fa877ef080ff4 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/PauseFollowerIndexStepTests.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.mockito.Mockito; + +import java.util.Collections; + +import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class PauseFollowerIndexStepTests extends AbstractUnfollowIndexStepTestCase { + + @Override + protected PauseFollowerIndexStep newInstance(Step.StepKey key, Step.StepKey nextKey, Client client) { + return new PauseFollowerIndexStep(key, nextKey, client); + } + + public void testPauseFollowingIndex() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Mockito.doAnswer(invocation -> { + PauseFollowAction.Request request = (PauseFollowAction.Request) invocation.getArguments()[1]; + assertThat(request.getFollowIndex(), equalTo("follower-index")); + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(client).execute(Mockito.same(PauseFollowAction.INSTANCE), Mockito.any(), Mockito.any()); + + Boolean[] completed = new Boolean[1]; + Exception[] failure = new Exception[1]; + PauseFollowerIndexStep step = new PauseFollowerIndexStep(randomStepKey(), randomStepKey(), client); + step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + @Override + public void onResponse(boolean complete) { + completed[0] = complete; + } + + @Override + public void onFailure(Exception e) { + failure[0] = e; + } + }); + assertThat(completed[0], is(true)); + assertThat(failure[0], nullValue()); + } + + public void testPauseFollowingIndexFailed() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + // Mock pause follow api call: + Client client = Mockito.mock(Client.class); + Exception error = new RuntimeException(); + Mockito.doAnswer(invocation -> { + PauseFollowAction.Request request = (PauseFollowAction.Request) invocation.getArguments()[1]; + assertThat(request.getFollowIndex(), equalTo("follower-index")); + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onFailure(error); + return null; + }).when(client).execute(Mockito.same(PauseFollowAction.INSTANCE), Mockito.any(), Mockito.any()); + + Boolean[] completed = new Boolean[1]; + Exception[] failure = new Exception[1]; + PauseFollowerIndexStep step = new PauseFollowerIndexStep(randomStepKey(), randomStepKey(), client); + step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + @Override + public void onResponse(boolean complete) { + completed[0] = complete; + } + + @Override + public void onFailure(Exception e) { + failure[0] = e; + } + }); + assertThat(completed[0], nullValue()); + assertThat(failure[0], sameInstance(error)); + Mockito.verify(client).execute(Mockito.same(PauseFollowAction.INSTANCE), Mockito.any(), Mockito.any()); + Mockito.verifyNoMoreInteractions(client); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleTypeTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleTypeTests.java index 76c8b1dd515ed..4efb34873d471 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleTypeTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/TimeseriesLifecycleTypeTests.java @@ -40,6 +40,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase { private static final ReadOnlyAction TEST_READ_ONLY_ACTION = new ReadOnlyAction(); private static final FreezeAction TEST_FREEZE_ACTION = new FreezeAction(); private static final SetPriorityAction TEST_PRIORITY_ACTION = new SetPriorityAction(0); + private static final UnfollowAction TEST_UNFOLLOW_ACTION = new UnfollowAction(); public void testValidatePhases() { boolean invalid = randomBoolean(); @@ -305,10 +306,14 @@ public void testGetPreviousPhaseName() { public void testGetNextActionName() { // Hot Phase + assertNextActionName("hot", SetPriorityAction.NAME, UnfollowAction.NAME, + new String[] {UnfollowAction.NAME, RolloverAction.NAME}); + assertNextActionName("hot", SetPriorityAction.NAME, RolloverAction.NAME, new String[]{RolloverAction.NAME}); assertNextActionName("hot", SetPriorityAction.NAME, null, new String[] {}); - assertNextActionName("hot", SetPriorityAction.NAME, RolloverAction.NAME, new String[]{SetPriorityAction.NAME, RolloverAction.NAME}); + assertNextActionName("hot", RolloverAction.NAME, null, new String[] {}); assertNextActionName("hot", RolloverAction.NAME, null, new String[] { RolloverAction.NAME }); + assertInvalidAction("hot", "foo", new String[] { RolloverAction.NAME }); assertInvalidAction("hot", AllocateAction.NAME, new String[] { RolloverAction.NAME }); assertInvalidAction("hot", DeleteAction.NAME, new String[] { RolloverAction.NAME }); @@ -317,6 +322,9 @@ public void testGetNextActionName() { assertInvalidAction("hot", ShrinkAction.NAME, new String[] { RolloverAction.NAME }); // Warm Phase + assertNextActionName("warm", SetPriorityAction.NAME, UnfollowAction.NAME, + new String[]{SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME, AllocateAction.NAME, + ShrinkAction.NAME, ForceMergeAction.NAME}); assertNextActionName("warm", SetPriorityAction.NAME, ReadOnlyAction.NAME, new String[]{SetPriorityAction.NAME, ReadOnlyAction.NAME, AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME}); assertNextActionName("warm", SetPriorityAction.NAME, AllocateAction.NAME, @@ -327,6 +335,17 @@ public void testGetNextActionName() { new String[]{SetPriorityAction.NAME, ForceMergeAction.NAME}); assertNextActionName("warm", SetPriorityAction.NAME, null, new String[]{SetPriorityAction.NAME}); + assertNextActionName("warm", UnfollowAction.NAME, ReadOnlyAction.NAME, + new String[] { SetPriorityAction.NAME, ReadOnlyAction.NAME, AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME }); + assertNextActionName("warm", UnfollowAction.NAME, ReadOnlyAction.NAME, + new String[] { ReadOnlyAction.NAME, AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME }); + assertNextActionName("warm", UnfollowAction.NAME, AllocateAction.NAME, + new String[] { AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME }); + assertNextActionName("warm", UnfollowAction.NAME, ShrinkAction.NAME, + new String[] { ShrinkAction.NAME, ForceMergeAction.NAME }); + assertNextActionName("warm", UnfollowAction.NAME, ForceMergeAction.NAME, new String[] { ForceMergeAction.NAME }); + assertNextActionName("warm", UnfollowAction.NAME, null, new String[] {}); + assertNextActionName("warm", ReadOnlyAction.NAME, AllocateAction.NAME, new String[] { ReadOnlyAction.NAME, AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME }); assertNextActionName("warm", ReadOnlyAction.NAME, ShrinkAction.NAME, @@ -371,15 +390,27 @@ public void testGetNextActionName() { new String[] { ReadOnlyAction.NAME, AllocateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME }); // Cold Phase - assertNextActionName("cold", SetPriorityAction.NAME, FreezeAction.NAME, new String[]{SetPriorityAction.NAME, FreezeAction.NAME}); + assertNextActionName("cold", SetPriorityAction.NAME, UnfollowAction.NAME, + new String[]{UnfollowAction.NAME, SetPriorityAction.NAME, FreezeAction.NAME}); + assertNextActionName("cold", SetPriorityAction.NAME, FreezeAction.NAME, + new String[]{SetPriorityAction.NAME, FreezeAction.NAME}); assertNextActionName("cold", SetPriorityAction.NAME, AllocateAction.NAME, new String[]{SetPriorityAction.NAME, AllocateAction.NAME}); assertNextActionName("cold", SetPriorityAction.NAME, null, new String[] { SetPriorityAction.NAME }); assertNextActionName("cold", SetPriorityAction.NAME, null, new String[] {}); + + assertNextActionName("cold", UnfollowAction.NAME, AllocateAction.NAME, + new String[] {SetPriorityAction.NAME, AllocateAction.NAME, FreezeAction.NAME}); + assertNextActionName("cold", UnfollowAction.NAME, AllocateAction.NAME, + new String[] {AllocateAction.NAME, FreezeAction.NAME}); + assertNextActionName("cold", UnfollowAction.NAME, FreezeAction.NAME, new String[] {FreezeAction.NAME}); + assertNextActionName("cold", UnfollowAction.NAME, null, new String[] {}); + assertNextActionName("cold", AllocateAction.NAME, null, new String[] { AllocateAction.NAME }); assertNextActionName("cold", AllocateAction.NAME, null, new String[] {}); assertNextActionName("cold", AllocateAction.NAME, null, new String[] {}); assertNextActionName("cold", AllocateAction.NAME, FreezeAction.NAME, FreezeAction.NAME); + assertNextActionName("cold", FreezeAction.NAME, null); assertNextActionName("cold", FreezeAction.NAME, null, AllocateAction.NAME); @@ -393,6 +424,7 @@ public void testGetNextActionName() { // Delete Phase assertNextActionName("delete", DeleteAction.NAME, null, new String[] {}); assertNextActionName("delete", DeleteAction.NAME, null, new String[] { DeleteAction.NAME }); + assertInvalidAction("delete", "foo", new String[] { DeleteAction.NAME }); assertInvalidAction("delete", AllocateAction.NAME, new String[] { DeleteAction.NAME }); assertInvalidAction("delete", ForceMergeAction.NAME, new String[] { DeleteAction.NAME }); @@ -401,6 +433,7 @@ public void testGetNextActionName() { assertInvalidAction("delete", ShrinkAction.NAME, new String[] { DeleteAction.NAME }); assertInvalidAction("delete", FreezeAction.NAME, new String[] { DeleteAction.NAME }); assertInvalidAction("delete", SetPriorityAction.NAME, new String[] { DeleteAction.NAME }); + assertInvalidAction("delete", UnfollowAction.NAME, new String[] { DeleteAction.NAME }); Phase phase = new Phase("foo", TimeValue.ZERO, Collections.emptyMap()); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, @@ -444,6 +477,8 @@ private ConcurrentMap convertActionNamesToActions(Strin return new FreezeAction(); case SetPriorityAction.NAME: return new SetPriorityAction(0); + case UnfollowAction.NAME: + return new UnfollowAction(); } return new DeleteAction(); }).collect(Collectors.toConcurrentMap(LifecycleAction::getWriteableName, Function.identity())); @@ -509,6 +544,8 @@ private LifecycleAction getTestAction(String actionName) { return TEST_FREEZE_ACTION; case SetPriorityAction.NAME: return TEST_PRIORITY_ACTION; + case UnfollowAction.NAME: + return TEST_UNFOLLOW_ACTION; default: throw new IllegalArgumentException("unsupported timeseries phase action [" + actionName + "]"); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowActionTests.java new file mode 100644 index 0000000000000..42f299a8aeafd --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowActionTests.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; + +import java.io.IOException; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class UnfollowActionTests extends AbstractActionTestCase { + + @Override + protected UnfollowAction doParseInstance(XContentParser parser) throws IOException { + return UnfollowAction.parse(parser); + } + + @Override + protected UnfollowAction createTestInstance() { + return new UnfollowAction(); + } + + @Override + protected Reader instanceReader() { + return UnfollowAction::new; + } + + public void testToSteps() { + UnfollowAction action = createTestInstance(); + String phase = randomAlphaOfLengthBetween(1, 10); + StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), + randomAlphaOfLengthBetween(1, 10)); + List steps = action.toSteps(null, phase, nextStepKey); + assertThat(steps, notNullValue()); + assertThat(steps.size(), equalTo(7)); + + StepKey expectedFirstStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForIndexingCompleteStep.NAME); + StepKey expectedSecondStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForFollowShardTasksStep.NAME); + StepKey expectedThirdStepKey = new StepKey(phase, UnfollowAction.NAME, PauseFollowerIndexStep.NAME); + StepKey expectedFourthStepKey = new StepKey(phase, UnfollowAction.NAME, CloseFollowerIndexStep.NAME); + StepKey expectedFifthStepKey = new StepKey(phase, UnfollowAction.NAME, UnfollowFollowIndexStep.NAME); + StepKey expectedSixthStepKey = new StepKey(phase, UnfollowAction.NAME, OpenFollowerIndexStep.NAME); + StepKey expectedSeventhStepKey = new StepKey(phase, UnfollowAction.NAME, WaitForYellowStep.NAME); + + WaitForIndexingCompleteStep firstStep = (WaitForIndexingCompleteStep) steps.get(0); + assertThat(firstStep.getKey(), equalTo(expectedFirstStepKey)); + assertThat(firstStep.getNextStepKey(), equalTo(expectedSecondStepKey)); + + WaitForFollowShardTasksStep secondStep = (WaitForFollowShardTasksStep) steps.get(1); + assertThat(secondStep.getKey(), equalTo(expectedSecondStepKey)); + assertThat(secondStep.getNextStepKey(), equalTo(expectedThirdStepKey)); + + PauseFollowerIndexStep thirdStep = (PauseFollowerIndexStep) steps.get(2); + assertThat(thirdStep.getKey(), equalTo(expectedThirdStepKey)); + assertThat(thirdStep.getNextStepKey(), equalTo(expectedFourthStepKey)); + + CloseFollowerIndexStep fourthStep = (CloseFollowerIndexStep) steps.get(3); + assertThat(fourthStep.getKey(), equalTo(expectedFourthStepKey)); + assertThat(fourthStep.getNextStepKey(), equalTo(expectedFifthStepKey)); + + UnfollowFollowIndexStep fifthStep = (UnfollowFollowIndexStep) steps.get(4); + assertThat(fifthStep.getKey(), equalTo(expectedFifthStepKey)); + assertThat(fifthStep.getNextStepKey(), equalTo(expectedSixthStepKey)); + + OpenFollowerIndexStep sixthStep = (OpenFollowerIndexStep) steps.get(5); + assertThat(sixthStep.getKey(), equalTo(expectedSixthStepKey)); + assertThat(sixthStep.getNextStepKey(), equalTo(expectedSeventhStepKey)); + + WaitForYellowStep seventhStep = (WaitForYellowStep) steps.get(6); + assertThat(seventhStep.getKey(), equalTo(expectedSeventhStepKey)); + assertThat(seventhStep.getNextStepKey(), equalTo(nextStepKey)); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowFollowIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowFollowIndexStepTests.java new file mode 100644 index 0000000000000..58558c92d2511 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/UnfollowFollowIndexStepTests.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; +import org.mockito.Mockito; + +import java.util.Collections; + +import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class UnfollowFollowIndexStepTests extends AbstractUnfollowIndexStepTestCase { + + @Override + protected UnfollowFollowIndexStep newInstance(Step.StepKey key, Step.StepKey nextKey, Client client) { + return new UnfollowFollowIndexStep(key, nextKey, client); + } + + public void testUnFollow() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Mockito.doAnswer(invocation -> { + UnfollowAction.Request request = (UnfollowAction.Request) invocation.getArguments()[1]; + assertThat(request.getFollowerIndex(), equalTo("follower-index")); + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(client).execute(Mockito.same(UnfollowAction.INSTANCE), Mockito.any(), Mockito.any()); + + Boolean[] completed = new Boolean[1]; + Exception[] failure = new Exception[1]; + UnfollowFollowIndexStep step = new UnfollowFollowIndexStep(randomStepKey(), randomStepKey(), client); + step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + @Override + public void onResponse(boolean complete) { + completed[0] = complete; + } + + @Override + public void onFailure(Exception e) { + failure[0] = e; + } + }); + assertThat(completed[0], is(true)); + assertThat(failure[0], nullValue()); + } + + public void testUnFollowUnfollowFailed() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + // Mock unfollow api call: + Exception error = new RuntimeException(); + Mockito.doAnswer(invocation -> { + UnfollowAction.Request request = (UnfollowAction.Request) invocation.getArguments()[1]; + assertThat(request.getFollowerIndex(), equalTo("follower-index")); + ActionListener listener = (ActionListener) invocation.getArguments()[2]; + listener.onFailure(error); + return null; + }).when(client).execute(Mockito.same(UnfollowAction.INSTANCE), Mockito.any(), Mockito.any()); + + Boolean[] completed = new Boolean[1]; + Exception[] failure = new Exception[1]; + UnfollowFollowIndexStep step = new UnfollowFollowIndexStep(randomStepKey(), randomStepKey(), client); + step.performAction(indexMetadata, null, new AsyncActionStep.Listener() { + @Override + public void onResponse(boolean complete) { + completed[0] = complete; + } + + @Override + public void onFailure(Exception e) { + failure[0] = e; + } + }); + assertThat(completed[0], nullValue()); + assertThat(failure[0], sameInstance(error)); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStepInfoTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStepInfoTests.java new file mode 100644 index 0000000000000..483df7632e2a4 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStepInfoTests.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; +import org.elasticsearch.xpack.core.indexlifecycle.WaitForFollowShardTasksStep.Info; +import org.elasticsearch.xpack.core.indexlifecycle.WaitForFollowShardTasksStep.Info.ShardFollowTaskInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class WaitForFollowShardTasksStepInfoTests extends AbstractXContentTestCase { + + private static final ConstructingObjectParser SHARD_FOLLOW_TASK_INFO_PARSER = + new ConstructingObjectParser<>( + "shard_follow_task_info_parser", + args -> new ShardFollowTaskInfo((String) args[0], (Integer) args[1], (Long) args[2], (Long) args[3]) + ); + + static { + SHARD_FOLLOW_TASK_INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), ShardFollowTaskInfo.FOLLOWER_INDEX_FIELD); + SHARD_FOLLOW_TASK_INFO_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTaskInfo.SHARD_ID_FIELD); + SHARD_FOLLOW_TASK_INFO_PARSER.declareLong(ConstructingObjectParser.constructorArg(), + ShardFollowTaskInfo.LEADER_GLOBAL_CHECKPOINT_FIELD); + SHARD_FOLLOW_TASK_INFO_PARSER.declareLong(ConstructingObjectParser.constructorArg(), + ShardFollowTaskInfo.FOLLOWER_GLOBAL_CHECKPOINT_FIELD); + } + + private static final ConstructingObjectParser INFO_PARSER = new ConstructingObjectParser<>( + "info_parser", + args -> { + @SuppressWarnings("unchecked") + Info info = new Info((List) args[0]); + return info; + } + ); + + static { + INFO_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), SHARD_FOLLOW_TASK_INFO_PARSER, + Info.SHARD_FOLLOW_TASKS); + INFO_PARSER.declareString((i, s) -> {}, Info.MESSAGE); + } + + @Override + protected Info createTestInstance() { + int numInfos = randomIntBetween(0, 32); + List shardFollowTaskInfos = new ArrayList<>(numInfos); + for (int i = 0; i < numInfos; i++) { + shardFollowTaskInfos.add(new ShardFollowTaskInfo(randomAlphaOfLength(3), randomIntBetween(0, 10), + randomNonNegativeLong(), randomNonNegativeLong())); + } + return new Info(shardFollowTaskInfos); + } + + @Override + protected Info doParseInstance(XContentParser parser) throws IOException { + return INFO_PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStepTests.java new file mode 100644 index 0000000000000..a0ee01a240347 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForFollowShardTasksStepTests.java @@ -0,0 +1,210 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.IsNull.notNullValue; + +public class WaitForFollowShardTasksStepTests extends AbstractStepTestCase { + + @Override + protected WaitForFollowShardTasksStep createRandomInstance() { + StepKey stepKey = randomStepKey(); + StepKey nextStepKey = randomStepKey(); + return new WaitForFollowShardTasksStep(stepKey, nextStepKey, Mockito.mock(Client.class)); + } + + @Override + protected WaitForFollowShardTasksStep mutateInstance(WaitForFollowShardTasksStep instance) { + StepKey key = instance.getKey(); + StepKey nextKey = instance.getNextStepKey(); + + if (randomBoolean()) { + key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } else { + nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } + + return new WaitForFollowShardTasksStep(key, nextKey, instance.getClient()); + } + + @Override + protected WaitForFollowShardTasksStep copyInstance(WaitForFollowShardTasksStep instance) { + return new WaitForFollowShardTasksStep(instance.getKey(), instance.getNextStepKey(), instance.getClient()); + } + + public void testConditionMet() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(2) + .numberOfReplicas(0) + .build(); + Client client = Mockito.mock(Client.class); + List statsResponses = Arrays.asList( + new FollowStatsAction.StatsResponse(createShardFollowTaskStatus(0, 9, 9)), + new FollowStatsAction.StatsResponse(createShardFollowTaskStatus(1, 3, 3)) + ); + mockFollowStatsCall(client, indexMetadata.getIndex().getName(), statsResponses); + + WaitForFollowShardTasksStep step = new WaitForFollowShardTasksStep(randomStepKey(), randomStepKey(), client); + final boolean[] conditionMetHolder = new boolean[1]; + final ToXContentObject[] informationContextHolder = new ToXContentObject[1]; + final Exception[] exceptionHolder = new Exception[1]; + step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + @Override + public void onResponse(boolean conditionMet, ToXContentObject informationContext) { + conditionMetHolder[0] = conditionMet; + informationContextHolder[0] = informationContext; + } + + @Override + public void onFailure(Exception e) { + exceptionHolder[0] = e; + } + }); + + assertThat(conditionMetHolder[0], is(true)); + assertThat(informationContextHolder[0], nullValue()); + assertThat(exceptionHolder[0], nullValue()); + } + + public void testConditionNotMetShardsNotInSync() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(2) + .numberOfReplicas(0) + .build(); + Client client = Mockito.mock(Client.class); + List statsResponses = Arrays.asList( + new FollowStatsAction.StatsResponse(createShardFollowTaskStatus(0, 9, 9)), + new FollowStatsAction.StatsResponse(createShardFollowTaskStatus(1, 8, 3)) + ); + mockFollowStatsCall(client, indexMetadata.getIndex().getName(), statsResponses); + + WaitForFollowShardTasksStep step = new WaitForFollowShardTasksStep(randomStepKey(), randomStepKey(), client); + final boolean[] conditionMetHolder = new boolean[1]; + final ToXContentObject[] informationContextHolder = new ToXContentObject[1]; + final Exception[] exceptionHolder = new Exception[1]; + step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + @Override + public void onResponse(boolean conditionMet, ToXContentObject informationContext) { + conditionMetHolder[0] = conditionMet; + informationContextHolder[0] = informationContext; + } + + @Override + public void onFailure(Exception e) { + exceptionHolder[0] = e; + } + }); + + assertThat(conditionMetHolder[0], is(false)); + assertThat(informationContextHolder[0], notNullValue()); + assertThat(exceptionHolder[0], nullValue()); + WaitForFollowShardTasksStep.Info info = (WaitForFollowShardTasksStep.Info) informationContextHolder[0]; + assertThat(info.getShardFollowTaskInfos().size(), equalTo(1)); + assertThat(info.getShardFollowTaskInfos().get(0).getShardId(), equalTo(1)); + assertThat(info.getShardFollowTaskInfos().get(0).getLeaderGlobalCheckpoint(), equalTo(8L)); + assertThat(info.getShardFollowTaskInfos().get(0).getFollowerGlobalCheckpoint(), equalTo(3L)); + } + + public void testConditionNotMetNotAFollowerIndex() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .numberOfShards(2) + .numberOfReplicas(0) + .build(); + Client client = Mockito.mock(Client.class); + + WaitForFollowShardTasksStep step = new WaitForFollowShardTasksStep(randomStepKey(), randomStepKey(), client); + final boolean[] conditionMetHolder = new boolean[1]; + final ToXContentObject[] informationContextHolder = new ToXContentObject[1]; + final Exception[] exceptionHolder = new Exception[1]; + step.evaluateCondition(indexMetadata, new AsyncWaitStep.Listener() { + @Override + public void onResponse(boolean conditionMet, ToXContentObject informationContext) { + conditionMetHolder[0] = conditionMet; + informationContextHolder[0] = informationContext; + } + + @Override + public void onFailure(Exception e) { + exceptionHolder[0] = e; + } + }); + + assertThat(conditionMetHolder[0], is(true)); + assertThat(informationContextHolder[0], nullValue()); + assertThat(exceptionHolder[0], nullValue()); + Mockito.verifyZeroInteractions(client); + } + + private static ShardFollowNodeTaskStatus createShardFollowTaskStatus(int shardId, long leaderGCP, long followerGCP) { + return new ShardFollowNodeTaskStatus( + "remote", + "leader-index", + "follower-index", + shardId, + leaderGCP, + -1, + followerGCP, + -1, + -1, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + Collections.emptyNavigableMap(), + 0, + null + ); + } + + private void mockFollowStatsCall(Client client, String expectedIndexName, List statsResponses) { + Mockito.doAnswer(invocationOnMock -> { + FollowStatsAction.StatsRequest request = (FollowStatsAction.StatsRequest) invocationOnMock.getArguments()[1]; + assertThat(request.indices().length, equalTo(1)); + assertThat(request.indices()[0], equalTo(expectedIndexName)); + + @SuppressWarnings("unchecked") + ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), statsResponses)); + return null; + }).when(client).execute(Mockito.eq(FollowStatsAction.INSTANCE), Mockito.any(), Mockito.any()); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForIndexingCompleteStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForIndexingCompleteStepTests.java new file mode 100644 index 0000000000000..41a9c5983a78c --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForIndexingCompleteStepTests.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; +import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; + +import java.util.Collections; + +import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class WaitForIndexingCompleteStepTests extends AbstractStepTestCase { + + @Override + protected WaitForIndexingCompleteStep createRandomInstance() { + StepKey stepKey = randomStepKey(); + StepKey nextStepKey = randomStepKey(); + return new WaitForIndexingCompleteStep(stepKey, nextStepKey); + } + + @Override + protected WaitForIndexingCompleteStep mutateInstance(WaitForIndexingCompleteStep instance) { + StepKey key = instance.getKey(); + StepKey nextKey = instance.getNextStepKey(); + + if (randomBoolean()) { + key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } else { + nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } + + return new WaitForIndexingCompleteStep(key, nextKey); + } + + @Override + protected WaitForIndexingCompleteStep copyInstance(WaitForIndexingCompleteStep instance) { + return new WaitForIndexingCompleteStep(instance.getKey(), instance.getNextStepKey()); + } + + public void testConditionMet() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true")) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder().put(indexMetadata, true).build()) + .build(); + + WaitForIndexingCompleteStep step = createRandomInstance(); + ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState); + assertThat(result.isComplete(), is(true)); + assertThat(result.getInfomationContext(), nullValue()); + } + + public void testConditionMetNotAFollowerIndex() { + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder().put(indexMetadata, true).build()) + .build(); + + WaitForIndexingCompleteStep step = createRandomInstance(); + ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState); + assertThat(result.isComplete(), is(true)); + assertThat(result.getInfomationContext(), nullValue()); + } + + public void testConditionNotMet() { + Settings.Builder indexSettings = settings(Version.CURRENT); + if (randomBoolean()) { + indexSettings.put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "false"); + } + IndexMetaData indexMetadata = IndexMetaData.builder("follower-index") + .settings(indexSettings) + .putCustom(CCR_METADATA_KEY, Collections.emptyMap()) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder().put(indexMetadata, true).build()) + .build(); + + WaitForIndexingCompleteStep step = createRandomInstance(); + ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState); + assertThat(result.isComplete(), is(false)); + assertThat(result.getInfomationContext(), notNullValue()); + WaitForIndexingCompleteStep.IndexingNotCompleteInfo info = + (WaitForIndexingCompleteStep.IndexingNotCompleteInfo) result.getInfomationContext(); + assertThat(info.getMessage(), equalTo("waiting for the [index.lifecycle.indexing_complete] setting to be set to " + + "true on the leader index, it is currently [false]")); + } + + public void testIndexDeleted() { + ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder().build()) + .build(); + + WaitForIndexingCompleteStep step = createRandomInstance(); + ClusterStateWaitStep.Result result = step.isConditionMet(new Index("this-index-doesnt-exist", "uuid"), clusterState); + assertThat(result.isComplete(), is(false)); + assertThat(result.getInfomationContext(), nullValue()); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForYellowStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForYellowStepTests.java new file mode 100644 index 0000000000000..6c3915d87cde4 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/WaitForYellowStepTests.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.xpack.core.indexlifecycle.Step.StepKey; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.core.IsNull.notNullValue; + +public class WaitForYellowStepTests extends AbstractStepTestCase { + + @Override + protected WaitForYellowStep createRandomInstance() { + StepKey stepKey = randomStepKey(); + StepKey nextStepKey = randomStepKey(); + return new WaitForYellowStep(stepKey, nextStepKey); + } + + @Override + protected WaitForYellowStep mutateInstance(WaitForYellowStep instance) { + StepKey key = instance.getKey(); + StepKey nextKey = instance.getNextStepKey(); + + if (randomBoolean()) { + key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } else { + nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5)); + } + + return new WaitForYellowStep(key, nextKey); + } + + @Override + protected WaitForYellowStep copyInstance(WaitForYellowStep instance) { + return new WaitForYellowStep(instance.getKey(), instance.getNextStepKey()); + } + + public void testConditionMet() { + IndexMetaData indexMetadata = IndexMetaData.builder("former-follower-index") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + ShardRouting shardRouting = + TestShardRouting.newShardRouting("index2", 0, "1", true, ShardRoutingState.STARTED); + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()) + .addShard(shardRouting).build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().put(indexMetadata, true).build()) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .build(); + + WaitForYellowStep step = new WaitForYellowStep(randomStepKey(), randomStepKey()); + ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState); + assertThat(result.isComplete(), is(true)); + assertThat(result.getInfomationContext(), nullValue()); + } + + public void testConditionNotMet() { + IndexMetaData indexMetadata = IndexMetaData.builder("former-follower-index") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + ShardRouting shardRouting = + TestShardRouting.newShardRouting("index2", 0, "1", true, ShardRoutingState.INITIALIZING); + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()) + .addShard(shardRouting).build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().put(indexMetadata, true).build()) + .routingTable(RoutingTable.builder().add(indexRoutingTable).build()) + .build(); + + WaitForYellowStep step = new WaitForYellowStep(randomStepKey(), randomStepKey()); + ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState); + assertThat(result.isComplete(), is(false)); + WaitForYellowStep.Info info = (WaitForYellowStep.Info) result.getInfomationContext(); + assertThat(info, notNullValue()); + assertThat(info.getMessage(), equalTo("index is red; not all primary shards are active")); + } + + public void testConditionNotMetNoIndexRoutingTable() { + IndexMetaData indexMetadata = IndexMetaData.builder("former-follower-index") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().put(indexMetadata, true).build()) + .routingTable(RoutingTable.builder().build()) + .build(); + + WaitForYellowStep step = new WaitForYellowStep(randomStepKey(), randomStepKey()); + ClusterStateWaitStep.Result result = step.isConditionMet(indexMetadata.getIndex(), clusterState); + assertThat(result.isComplete(), is(false)); + WaitForYellowStep.Info info = (WaitForYellowStep.Info) result.getInfomationContext(); + assertThat(info, notNullValue()); + assertThat(info.getMessage(), equalTo("index is red; no IndexRoutingTable")); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/action/PutLifecycleRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/action/PutLifecycleRequestTests.java index d747e26161234..2c59d9ca5782a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/action/PutLifecycleRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/action/PutLifecycleRequestTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction; import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction; import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction; import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction.Request; import org.junit.Before; @@ -68,7 +69,8 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new), - new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new) + new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new), + new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new) )); } @@ -85,7 +87,8 @@ protected NamedXContentRegistry xContentRegistry() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse) + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse) )); return new NamedXContentRegistry(entries); } diff --git a/x-pack/plugin/ilm/qa/multi-cluster/build.gradle b/x-pack/plugin/ilm/qa/multi-cluster/build.gradle new file mode 100644 index 0000000000000..59df733892944 --- /dev/null +++ b/x-pack/plugin/ilm/qa/multi-cluster/build.gradle @@ -0,0 +1,54 @@ +import org.elasticsearch.gradle.test.RestIntegTestTask + +apply plugin: 'elasticsearch.standalone-test' + +dependencies { + testCompile project(':x-pack:plugin:ccr:qa') +} + +task leaderClusterTest(type: RestIntegTestTask) { + mustRunAfter(precommit) +} + +leaderClusterTestCluster { + numNodes = 1 + clusterName = 'leader-cluster' + setting 'xpack.ilm.enabled', 'true' + setting 'xpack.ccr.enabled', 'true' + setting 'xpack.security.enabled', 'false' + setting 'xpack.watcher.enabled', 'false' + setting 'xpack.monitoring.enabled', 'false' + setting 'xpack.ml.enabled', 'false' + setting 'xpack.license.self_generated.type', 'trial' + setting 'indices.lifecycle.poll_interval', '1000ms' +} + +leaderClusterTestRunner { + systemProperty 'tests.target_cluster', 'leader' +} + +task followClusterTest(type: RestIntegTestTask) {} + +followClusterTestCluster { + dependsOn leaderClusterTestRunner + numNodes = 1 + clusterName = 'follow-cluster' + setting 'xpack.ilm.enabled', 'true' + setting 'xpack.ccr.enabled', 'true' + setting 'xpack.security.enabled', 'false' + setting 'xpack.watcher.enabled', 'false' + setting 'xpack.monitoring.enabled', 'false' + setting 'xpack.ml.enabled', 'false' + setting 'xpack.license.self_generated.type', 'trial' + setting 'indices.lifecycle.poll_interval', '1000ms' + setting 'cluster.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" +} + +followClusterTestRunner { + systemProperty 'tests.target_cluster', 'follow' + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + finalizedBy 'leaderClusterTestCluster#stop' +} + +check.dependsOn followClusterTest +unitTest.enabled = false // no unit tests for this module, only the rest integration test diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java new file mode 100644 index 0000000000000..797916c7c405f --- /dev/null +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java @@ -0,0 +1,285 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.indexlifecycle; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.xpack.ccr.ESCCRRestTestCase; + +import java.io.IOException; +import java.util.Map; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class CCRIndexLifecycleIT extends ESCCRRestTestCase { + + private static final Logger LOGGER = LogManager.getLogger(CCRIndexLifecycleIT.class); + + public void testBasicCCRAndILMIntegration() throws Exception { + String indexName = "logs-1"; + + String policyName = "basic-test"; + if ("leader".equals(targetCluster)) { + putILMPolicy(policyName, "50GB", null, TimeValue.timeValueHours(7*24)); + Settings indexSettings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.lifecycle.name", policyName) + .put("index.lifecycle.rollover_alias", "logs") + .build(); + createIndex(indexName, indexSettings, "", "\"logs\": { }"); + ensureGreen(indexName); + } else if ("follow".equals(targetCluster)) { + // Policy with the same name must exist in follower cluster too: + putILMPolicy(policyName, "50GB", null, TimeValue.timeValueHours(7*24)); + followIndex(indexName, indexName); + // Aliases are not copied from leader index, so we need to add that for the rollover action in follower cluster: + client().performRequest(new Request("PUT", "/" + indexName + "/_alias/logs")); + + try (RestClient leaderClient = buildLeaderClient()) { + index(leaderClient, indexName, "1"); + assertDocumentExists(leaderClient, indexName, "1"); + + assertBusy(() -> { + assertDocumentExists(client(), indexName, "1"); + // Sanity check that following_index setting has been set, so that we can verify later that this setting has been unset: + assertThat(getIndexSetting(client(), indexName, "index.xpack.ccr.following_index"), equalTo("true")); + + assertILMPolicy(leaderClient, indexName, policyName, "hot"); + assertILMPolicy(client(), indexName, policyName, "hot"); + }); + + updateIndexSettings(leaderClient, indexName, Settings.builder() + .put("index.lifecycle.indexing_complete", true) + .build() + ); + + assertBusy(() -> { + // Ensure that 'index.lifecycle.indexing_complete' is replicated: + assertThat(getIndexSetting(leaderClient, indexName, "index.lifecycle.indexing_complete"), equalTo("true")); + assertThat(getIndexSetting(client(), indexName, "index.lifecycle.indexing_complete"), equalTo("true")); + + assertILMPolicy(leaderClient, indexName, policyName, "warm"); + assertILMPolicy(client(), indexName, policyName, "warm"); + + // ILM should have placed both indices in the warm phase and there these indices are read-only: + assertThat(getIndexSetting(leaderClient, indexName, "index.blocks.write"), equalTo("true")); + assertThat(getIndexSetting(client(), indexName, "index.blocks.write"), equalTo("true")); + // ILM should have unfollowed the follower index, so the following_index setting should have been removed: + // (this controls whether the follow engine is used) + assertThat(getIndexSetting(client(), indexName, "index.xpack.ccr.following_index"), nullValue()); + }); + } + } else { + fail("unexpected target cluster [" + targetCluster + "]"); + } + } + + public void testCcrAndIlmWithRollover() throws Exception { + String alias = "metrics"; + String indexName = "metrics-000001"; + String nextIndexName = "metrics-000002"; + String policyName = "rollover-test"; + + if ("leader".equals(targetCluster)) { + // Create a policy on the leader + putILMPolicy(policyName, null, 1, null); + Request templateRequest = new Request("PUT", "_template/my_template"); + Settings indexSettings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.lifecycle.name", policyName) + .put("index.lifecycle.rollover_alias", alias) + .build(); + templateRequest.setJsonEntity("{\"index_patterns\": [\"metrics-*\"], \"settings\": " + Strings.toString(indexSettings) + "}"); + assertOK(client().performRequest(templateRequest)); + } else if ("follow".equals(targetCluster)) { + // Policy with the same name must exist in follower cluster too: + putILMPolicy(policyName, null, 1, null); + + // Set up an auto-follow pattern + Request createAutoFollowRequest = new Request("PUT", "/_ccr/auto_follow/my_auto_follow_pattern"); + createAutoFollowRequest.setJsonEntity("{\"leader_index_patterns\": [\"metrics-*\"], " + + "\"remote_cluster\": \"leader_cluster\", \"read_poll_timeout\": \"1000ms\"}"); + assertOK(client().performRequest(createAutoFollowRequest)); + + try (RestClient leaderClient = buildLeaderClient()) { + // Create an index on the leader using the template set up above + Request createIndexRequest = new Request("PUT", "/" + indexName); + createIndexRequest.setJsonEntity("{" + + "\"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}}, " + + "\"aliases\": {\"" + alias + "\": {\"is_write_index\": true}} }"); + assertOK(leaderClient.performRequest(createIndexRequest)); + // Check that the new index is creeg + Request checkIndexRequest = new Request("GET", "/_cluster/health/" + indexName); + checkIndexRequest.addParameter("wait_for_status", "green"); + checkIndexRequest.addParameter("timeout", "70s"); + checkIndexRequest.addParameter("level", "shards"); + assertOK(leaderClient.performRequest(checkIndexRequest)); + + // Check that it got replicated to the follower + assertBusy(() -> assertTrue(indexExists(indexName))); + + // Aliases are not copied from leader index, so we need to add that for the rollover action in follower cluster: + client().performRequest(new Request("PUT", "/" + indexName + "/_alias/" + alias)); + + index(leaderClient, indexName, "1"); + assertDocumentExists(leaderClient, indexName, "1"); + + assertBusy(() -> { + assertDocumentExists(client(), indexName, "1"); + // Sanity check that following_index setting has been set, so that we can verify later that this setting has been unset: + assertThat(getIndexSetting(client(), indexName, "index.xpack.ccr.following_index"), equalTo("true")); + }); + + // Wait for the index to roll over on the leader + assertBusy(() -> { + assertOK(leaderClient.performRequest(new Request("HEAD", "/" + nextIndexName))); + assertThat(getIndexSetting(leaderClient, indexName, "index.lifecycle.indexing_complete"), equalTo("true")); + + }); + + assertBusy(() -> { + // Wait for the next index should have been created on the leader + assertOK(leaderClient.performRequest(new Request("HEAD", "/" + nextIndexName))); + // And the old index should have a write block and indexing complete set + assertThat(getIndexSetting(leaderClient, indexName, "index.blocks.write"), equalTo("true")); + assertThat(getIndexSetting(leaderClient, indexName, "index.lifecycle.indexing_complete"), equalTo("true")); + + }); + + assertBusy(() -> { + // Wait for the setting to get replicated to the follower + assertThat(getIndexSetting(client(), indexName, "index.lifecycle.indexing_complete"), equalTo("true")); + }); + + assertBusy(() -> { + // ILM should have unfollowed the follower index, so the following_index setting should have been removed: + // (this controls whether the follow engine is used) + assertThat(getIndexSetting(client(), indexName, "index.xpack.ccr.following_index"), nullValue()); + // The next index should have been created on the follower as well + indexExists(nextIndexName); + }); + + assertBusy(() -> { + // And the previously-follower index should be in the warm phase + assertILMPolicy(client(), indexName, policyName, "warm"); + }); + + // Clean up + leaderClient.performRequest(new Request("DELETE", "/_template/my_template")); + } + } else { + fail("unexpected target cluster [" + targetCluster + "]"); + } + } + + private static void putILMPolicy(String name, String maxSize, Integer maxDocs, TimeValue maxAge) throws IOException { + final Request request = new Request("PUT", "_ilm/policy/" + name); + XContentBuilder builder = jsonBuilder(); + builder.startObject(); + { + builder.startObject("policy"); + { + builder.startObject("phases"); + { + builder.startObject("hot"); + { + builder.startObject("actions"); + { + builder.startObject("rollover"); + if (maxSize != null) { + builder.field("max_size", maxSize); + } + if (maxAge != null) { + builder.field("max_age", maxAge); + } + if (maxDocs != null) { + builder.field("max_docs", maxDocs); + } + builder.endObject(); + } + { + builder.startObject("unfollow"); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + builder.startObject("warm"); + { + builder.startObject("actions"); + { + builder.startObject("readonly"); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + builder.startObject("delete"); + { + builder.field("min_age", "7d"); + builder.startObject("actions"); + { + builder.startObject("delete"); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + request.setJsonEntity(Strings.toString(builder)); + assertOK(client().performRequest(request)); + } + + private static void assertILMPolicy(RestClient client, String index, String policy, String expectedPhase) throws IOException { + final Request request = new Request("GET", "/" + index + "/_ilm/explain"); + Map response = toMap(client.performRequest(request)); + LOGGER.info("response={}", response); + Map explanation = (Map) ((Map) response.get("indices")).get(index); + assertThat(explanation.get("managed"), is(true)); + assertThat(explanation.get("policy"), equalTo(policy)); + assertThat(explanation.get("phase"), equalTo(expectedPhase)); + } + + private static void updateIndexSettings(RestClient client, String index, Settings settings) throws IOException { + final Request request = new Request("PUT", "/" + index + "/_settings"); + request.setJsonEntity(Strings.toString(settings)); + assertOK(client.performRequest(request)); + } + + private static Object getIndexSetting(RestClient client, String index, String setting) throws IOException { + Request request = new Request("GET", "/" + index + "/_settings"); + request.addParameter("flat_settings", "true"); + Map response = toMap(client.performRequest(request)); + Map settings = (Map) ((Map) response.get(index)).get("settings"); + return settings.get(setting); + } + + private static void assertDocumentExists(RestClient client, String index, String id) throws IOException { + Request request = new Request("HEAD", "/" + index + "/_doc/" + id); + Response response = client.performRequest(request); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + } + +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 0088b7fde1cba..baa1d8facd958 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -47,6 +47,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction; import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction; import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction; import org.elasticsearch.xpack.core.indexlifecycle.action.DeleteLifecycleAction; import org.elasticsearch.xpack.core.indexlifecycle.action.ExplainLifecycleAction; import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction; @@ -161,7 +162,8 @@ public List getNa new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse) + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse) ); } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java index 5ac01f4753012..790dd5de632e6 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.SetPriorityAction; import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction; import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction; import java.io.IOException; import java.util.ArrayList; @@ -85,7 +86,8 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { new NamedWriteableRegistry.Entry(LifecycleAction.class, RolloverAction.NAME, RolloverAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, ShrinkAction.NAME, ShrinkAction::new), new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new), - new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new) + new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new), + new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new) )); } @@ -102,7 +104,8 @@ protected NamedXContentRegistry xContentRegistry() { new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse) + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse) )); return new NamedXContentRegistry(entries); }