diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java index 90b9d15f21b85..542ac156a9871 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java @@ -30,6 +30,11 @@ public RolloverStep(StepKey key, StepKey nextStepKey, Client client) { super(key, nextStepKey, client); } + @Override + public boolean isRetryable() { + return true; + } + @Override public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, ClusterStateObserver observer, Listener listener) { diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 83c1dcc6464b7..9157053b0cfad 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -940,69 +940,142 @@ public void testILMRolloverRetriesOnReadOnlyBlock() throws Exception { assertBusy(() -> assertThat(getStepKeyForIndex(firstIndex), equalTo(TerminalPolicyStep.KEY))); } - public void testILMRolloverOnManuallyRolledIndex() throws Exception { - String originalIndex = index + "-000001"; - String secondIndex = index + "-000002"; - String thirdIndex = index + "-000003"; - - // Set up a policy with rollover - createNewSingletonPolicy("hot", new RolloverAction(null, null, 2L)); - Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); - createIndexTemplate.setJsonEntity("{" + - "\"index_patterns\": [\""+ index + "-*\"], \n" + - " \"settings\": {\n" + - " \"number_of_shards\": 1,\n" + - " \"number_of_replicas\": 0,\n" + - " \"index.lifecycle.name\": \"" + policy+ "\", \n" + - " \"index.lifecycle.rollover_alias\": \"alias\"\n" + - " }\n" + - "}"); - client().performRequest(createIndexTemplate); - - createIndexWithSettings( - originalIndex, - Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), - true - ); - - // Index a document - index(client(), originalIndex, "1", "foo", "bar"); - Request refreshOriginalIndex = new Request("POST", "/" + originalIndex + "/_refresh"); - client().performRequest(refreshOriginalIndex); - - // Manual rollover - Request rolloverRequest = new Request("POST", "/alias/_rollover"); - rolloverRequest.setJsonEntity("{\n" + - " \"conditions\": {\n" + - " \"max_docs\": \"1\"\n" + - " }\n" + - "}" - ); - client().performRequest(rolloverRequest); - assertBusy(() -> assertTrue(indexExists(secondIndex))); - - // Index another document into the original index so the ILM rollover policy condition is met - index(client(), originalIndex, "2", "foo", "bar"); - client().performRequest(refreshOriginalIndex); - - // Wait for the rollover policy to execute - assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY))); - - // ILM should manage the second index after attempting (and skipping) rolling the original index - assertBusy(() -> assertTrue((boolean) explainIndex(secondIndex).getOrDefault("managed", true))); - - // index some documents to trigger an ILM rollover - index(client(), "alias", "1", "foo", "bar"); - index(client(), "alias", "2", "foo", "bar"); - index(client(), "alias", "3", "foo", "bar"); - Request refreshSecondIndex = new Request("POST", "/" + secondIndex + "/_refresh"); - client().performRequest(refreshSecondIndex).getStatusLine(); - - // ILM should rollover the second index even though it skipped the first one - assertBusy(() -> assertThat(getStepKeyForIndex(secondIndex), equalTo(TerminalPolicyStep.KEY))); - assertBusy(() -> assertTrue(indexExists(thirdIndex))); - } + public void testILMRolloverOnManuallyRolledIndex() throws Exception { + String originalIndex = index + "-000001"; + String secondIndex = index + "-000002"; + String thirdIndex = index + "-000003"; + + // Set up a policy with rollover + createNewSingletonPolicy("hot", new RolloverAction(null, null, 2L)); + Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\"" + index + "-*\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy + "\", \n" + + " \"index.lifecycle.rollover_alias\": \"alias\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + createIndexWithSettings( + originalIndex, + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), + true + ); + + // Index a document + index(client(), originalIndex, "1", "foo", "bar"); + Request refreshOriginalIndex = new Request("POST", "/" + originalIndex + "/_refresh"); + client().performRequest(refreshOriginalIndex); + + // Manual rollover + Request rolloverRequest = new Request("POST", "/alias/_rollover"); + rolloverRequest.setJsonEntity("{\n" + + " \"conditions\": {\n" + + " \"max_docs\": \"1\"\n" + + " }\n" + + "}" + ); + client().performRequest(rolloverRequest); + assertBusy(() -> assertTrue(indexExists(secondIndex))); + + // Index another document into the original index so the ILM rollover policy condition is met + index(client(), originalIndex, "2", "foo", "bar"); + client().performRequest(refreshOriginalIndex); + + // Wait for the rollover policy to execute + assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY))); + + // ILM should manage the second index after attempting (and skipping) rolling the original index + assertBusy(() -> assertTrue((boolean) explainIndex(secondIndex).getOrDefault("managed", true))); + + // index some documents to trigger an ILM rollover + index(client(), "alias", "1", "foo", "bar"); + index(client(), "alias", "2", "foo", "bar"); + index(client(), "alias", "3", "foo", "bar"); + Request refreshSecondIndex = new Request("POST", "/" + secondIndex + "/_refresh"); + client().performRequest(refreshSecondIndex).getStatusLine(); + + // ILM should rollover the second index even though it skipped the first one + assertBusy(() -> assertThat(getStepKeyForIndex(secondIndex), equalTo(TerminalPolicyStep.KEY))); + assertBusy(() -> assertTrue(indexExists(thirdIndex))); + } + + public void testRolloverStepRetriesUntilRolledOverIndexIsDeleted() throws Exception { + String index = this.index + "-000001"; + String rolledIndex = this.index + "-000002"; + + createNewSingletonPolicy("hot", new RolloverAction(null, TimeValue.timeValueSeconds(1), null)); + + // create the rolled index so the rollover of the first index fails + createIndexWithSettings( + rolledIndex, + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"), + false + ); + + createIndexWithSettings( + index, + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(LifecycleSettings.LIFECYCLE_NAME, policy) + .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"), + true + ); + + assertBusy(() -> assertThat((Integer) explainIndex(index).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)), 30, + TimeUnit.SECONDS); + + Request moveToStepRequest = new Request("POST", "_ilm/move/" + index); + moveToStepRequest.setJsonEntity("{\n" + + " \"current_step\": {\n" + + " \"phase\": \"hot\",\n" + + " \"action\": \"rollover\",\n" + + " \"name\": \"check-rollover-ready\"\n" + + " },\n" + + " \"next_step\": {\n" + + " \"phase\": \"hot\",\n" + + " \"action\": \"rollover\",\n" + + " \"name\": \"attempt-rollover\"\n" + + " }\n" + + "}"); + + // Using {@link #waitUntil} here as ILM moves back and forth between the {@link WaitForRolloverReadyStep} step and + // {@link org.elasticsearch.xpack.core.ilm.ErrorStep} in order to retry the failing step. As {@link #assertBusy} + // increases the wait time between calls exponentially, we might miss the window where the policy is on + // {@link WaitForRolloverReadyStep} and the move to `attempt-rollover` request will not be successful. + waitUntil(() -> { + try { + return client().performRequest(moveToStepRequest).getStatusLine().getStatusCode() == 200; + } catch (IOException e) { + return false; + } + }, 30, TimeUnit.SECONDS); + + // Similar to above, using {@link #waitUntil} as we want to make sure the `attempt-rollover` step started failing and is being + // retried (which means ILM moves back and forth between the `attempt-rollover` step and the `error` step) + waitUntil(() -> { + try { + Map explainIndexResponse = explainIndex(index); + String step = (String) explainIndexResponse.get("step"); + Integer retryCount = (Integer) explainIndexResponse.get(FAILED_STEP_RETRY_COUNT_FIELD); + return step != null && step.equals("attempt-rollover") && retryCount != null && retryCount >= 1; + } catch (IOException e) { + return false; + } + }, 30, TimeUnit.SECONDS); + + deleteIndex(rolledIndex); + + // the rollover step should eventually succeed + assertBusy(() -> assertThat(indexExists(rolledIndex), is(true))); + assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY))); + } public void testHistoryIsWrittenWithSuccess() throws Exception { String index = "index"; diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index 736d5decc1123..8e892a351d655 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -203,6 +203,20 @@ public void onFailure(String source, Exception e) { logger.error(new ParameterizedMessage("retry execution of step [{}] for index [{}] failed", failedStep.getKey().getName(), index), e); } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (oldState.equals(newState) == false) { + IndexMetaData newIndexMeta = newState.metaData().index(index); + Step indexMetaCurrentStep = getCurrentStep(stepRegistry, policy, newIndexMeta); + StepKey stepKey = indexMetaCurrentStep.getKey(); + if (stepKey != null && stepKey != TerminalPolicyStep.KEY && newIndexMeta != null) { + logger.trace("policy [{}] for index [{}] was moved back on the failed step for as part of an automatic " + + "retry. Attempting to execute the failed step [{}] if it's an async action", policy, index, stepKey); + maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey); + } + } + } }); } else { logger.debug("policy [{}] for index [{}] on an error step after a terminal error, skipping execution", policy, index);