From 8bee5f4cb58a1242cc2ef4bc0317dae6c8be49d3 Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Fri, 3 Jan 2020 12:24:03 +0200 Subject: [PATCH] ILM retryable async action steps (#50522) This adds support for retrying AsyncActionSteps by triggering the async step after ILM was moved back on the failed step (the async step we'll be attempting to run after the cluster state reflects ILM being moved back on the failed step). This also marks the RolloverStep as retryable and adds an integration test where the RolloverStep is failing to execute as the rolled over index already exists to test that the async action RolloverStep is retried until the rolled over index is deleted. --- .../xpack/core/ilm/RolloverStep.java | 5 + .../ilm/TimeSeriesLifecycleActionsIT.java | 199 ++++++++++++------ .../xpack/ilm/IndexLifecycleRunner.java | 14 ++ 3 files changed, 155 insertions(+), 63 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java index 90b9d15f21b85..542ac156a9871 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RolloverStep.java @@ -30,6 +30,11 @@ public RolloverStep(StepKey key, StepKey nextStepKey, Client client) { super(key, nextStepKey, client); } + @Override + public boolean isRetryable() { + return true; + } + @Override public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, ClusterStateObserver observer, Listener listener) { diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java index 83c1dcc6464b7..9157053b0cfad 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java @@ -940,69 +940,142 @@ public void testILMRolloverRetriesOnReadOnlyBlock() throws Exception { assertBusy(() -> assertThat(getStepKeyForIndex(firstIndex), equalTo(TerminalPolicyStep.KEY))); } - public void testILMRolloverOnManuallyRolledIndex() throws Exception { - String originalIndex = index + "-000001"; - String secondIndex = index + "-000002"; - String thirdIndex = index + "-000003"; - - // Set up a policy with rollover - createNewSingletonPolicy("hot", new RolloverAction(null, null, 2L)); - Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); - createIndexTemplate.setJsonEntity("{" + - "\"index_patterns\": [\""+ index + "-*\"], \n" + - " \"settings\": {\n" + - " \"number_of_shards\": 1,\n" + - " \"number_of_replicas\": 0,\n" + - " \"index.lifecycle.name\": \"" + policy+ "\", \n" + - " \"index.lifecycle.rollover_alias\": \"alias\"\n" + - " }\n" + - "}"); - client().performRequest(createIndexTemplate); - - createIndexWithSettings( - originalIndex, - Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), - true - ); - - // Index a document - index(client(), originalIndex, "1", "foo", "bar"); - Request refreshOriginalIndex = new Request("POST", "/" + originalIndex + "/_refresh"); - client().performRequest(refreshOriginalIndex); - - // Manual rollover - Request rolloverRequest = new Request("POST", "/alias/_rollover"); - rolloverRequest.setJsonEntity("{\n" + - " \"conditions\": {\n" + - " \"max_docs\": \"1\"\n" + - " }\n" + - "}" - ); - client().performRequest(rolloverRequest); - assertBusy(() -> assertTrue(indexExists(secondIndex))); - - // Index another document into the original index so the ILM rollover policy condition is met - index(client(), originalIndex, "2", "foo", "bar"); - client().performRequest(refreshOriginalIndex); - - // Wait for the rollover policy to execute - assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY))); - - // ILM should manage the second index after attempting (and skipping) rolling the original index - assertBusy(() -> assertTrue((boolean) explainIndex(secondIndex).getOrDefault("managed", true))); - - // index some documents to trigger an ILM rollover - index(client(), "alias", "1", "foo", "bar"); - index(client(), "alias", "2", "foo", "bar"); - index(client(), "alias", "3", "foo", "bar"); - Request refreshSecondIndex = new Request("POST", "/" + secondIndex + "/_refresh"); - client().performRequest(refreshSecondIndex).getStatusLine(); - - // ILM should rollover the second index even though it skipped the first one - assertBusy(() -> assertThat(getStepKeyForIndex(secondIndex), equalTo(TerminalPolicyStep.KEY))); - assertBusy(() -> assertTrue(indexExists(thirdIndex))); - } + public void testILMRolloverOnManuallyRolledIndex() throws Exception { + String originalIndex = index + "-000001"; + String secondIndex = index + "-000002"; + String thirdIndex = index + "-000003"; + + // Set up a policy with rollover + createNewSingletonPolicy("hot", new RolloverAction(null, null, 2L)); + Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes"); + createIndexTemplate.setJsonEntity("{" + + "\"index_patterns\": [\"" + index + "-*\"], \n" + + " \"settings\": {\n" + + " \"number_of_shards\": 1,\n" + + " \"number_of_replicas\": 0,\n" + + " \"index.lifecycle.name\": \"" + policy + "\", \n" + + " \"index.lifecycle.rollover_alias\": \"alias\"\n" + + " }\n" + + "}"); + client().performRequest(createIndexTemplate); + + createIndexWithSettings( + originalIndex, + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0), + true + ); + + // Index a document + index(client(), originalIndex, "1", "foo", "bar"); + Request refreshOriginalIndex = new Request("POST", "/" + originalIndex + "/_refresh"); + client().performRequest(refreshOriginalIndex); + + // Manual rollover + Request rolloverRequest = new Request("POST", "/alias/_rollover"); + rolloverRequest.setJsonEntity("{\n" + + " \"conditions\": {\n" + + " \"max_docs\": \"1\"\n" + + " }\n" + + "}" + ); + client().performRequest(rolloverRequest); + assertBusy(() -> assertTrue(indexExists(secondIndex))); + + // Index another document into the original index so the ILM rollover policy condition is met + index(client(), originalIndex, "2", "foo", "bar"); + client().performRequest(refreshOriginalIndex); + + // Wait for the rollover policy to execute + assertBusy(() -> assertThat(getStepKeyForIndex(originalIndex), equalTo(TerminalPolicyStep.KEY))); + + // ILM should manage the second index after attempting (and skipping) rolling the original index + assertBusy(() -> assertTrue((boolean) explainIndex(secondIndex).getOrDefault("managed", true))); + + // index some documents to trigger an ILM rollover + index(client(), "alias", "1", "foo", "bar"); + index(client(), "alias", "2", "foo", "bar"); + index(client(), "alias", "3", "foo", "bar"); + Request refreshSecondIndex = new Request("POST", "/" + secondIndex + "/_refresh"); + client().performRequest(refreshSecondIndex).getStatusLine(); + + // ILM should rollover the second index even though it skipped the first one + assertBusy(() -> assertThat(getStepKeyForIndex(secondIndex), equalTo(TerminalPolicyStep.KEY))); + assertBusy(() -> assertTrue(indexExists(thirdIndex))); + } + + public void testRolloverStepRetriesUntilRolledOverIndexIsDeleted() throws Exception { + String index = this.index + "-000001"; + String rolledIndex = this.index + "-000002"; + + createNewSingletonPolicy("hot", new RolloverAction(null, TimeValue.timeValueSeconds(1), null)); + + // create the rolled index so the rollover of the first index fails + createIndexWithSettings( + rolledIndex, + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"), + false + ); + + createIndexWithSettings( + index, + Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(LifecycleSettings.LIFECYCLE_NAME, policy) + .put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"), + true + ); + + assertBusy(() -> assertThat((Integer) explainIndex(index).get(FAILED_STEP_RETRY_COUNT_FIELD), greaterThanOrEqualTo(1)), 30, + TimeUnit.SECONDS); + + Request moveToStepRequest = new Request("POST", "_ilm/move/" + index); + moveToStepRequest.setJsonEntity("{\n" + + " \"current_step\": {\n" + + " \"phase\": \"hot\",\n" + + " \"action\": \"rollover\",\n" + + " \"name\": \"check-rollover-ready\"\n" + + " },\n" + + " \"next_step\": {\n" + + " \"phase\": \"hot\",\n" + + " \"action\": \"rollover\",\n" + + " \"name\": \"attempt-rollover\"\n" + + " }\n" + + "}"); + + // Using {@link #waitUntil} here as ILM moves back and forth between the {@link WaitForRolloverReadyStep} step and + // {@link org.elasticsearch.xpack.core.ilm.ErrorStep} in order to retry the failing step. As {@link #assertBusy} + // increases the wait time between calls exponentially, we might miss the window where the policy is on + // {@link WaitForRolloverReadyStep} and the move to `attempt-rollover` request will not be successful. + waitUntil(() -> { + try { + return client().performRequest(moveToStepRequest).getStatusLine().getStatusCode() == 200; + } catch (IOException e) { + return false; + } + }, 30, TimeUnit.SECONDS); + + // Similar to above, using {@link #waitUntil} as we want to make sure the `attempt-rollover` step started failing and is being + // retried (which means ILM moves back and forth between the `attempt-rollover` step and the `error` step) + waitUntil(() -> { + try { + Map explainIndexResponse = explainIndex(index); + String step = (String) explainIndexResponse.get("step"); + Integer retryCount = (Integer) explainIndexResponse.get(FAILED_STEP_RETRY_COUNT_FIELD); + return step != null && step.equals("attempt-rollover") && retryCount != null && retryCount >= 1; + } catch (IOException e) { + return false; + } + }, 30, TimeUnit.SECONDS); + + deleteIndex(rolledIndex); + + // the rollover step should eventually succeed + assertBusy(() -> assertThat(indexExists(rolledIndex), is(true))); + assertBusy(() -> assertThat(getStepKeyForIndex(index), equalTo(TerminalPolicyStep.KEY))); + } public void testHistoryIsWrittenWithSuccess() throws Exception { String index = "index"; diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java index 736d5decc1123..8e892a351d655 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java @@ -203,6 +203,20 @@ public void onFailure(String source, Exception e) { logger.error(new ParameterizedMessage("retry execution of step [{}] for index [{}] failed", failedStep.getKey().getName(), index), e); } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (oldState.equals(newState) == false) { + IndexMetaData newIndexMeta = newState.metaData().index(index); + Step indexMetaCurrentStep = getCurrentStep(stepRegistry, policy, newIndexMeta); + StepKey stepKey = indexMetaCurrentStep.getKey(); + if (stepKey != null && stepKey != TerminalPolicyStep.KEY && newIndexMeta != null) { + logger.trace("policy [{}] for index [{}] was moved back on the failed step for as part of an automatic " + + "retry. Attempting to execute the failed step [{}] if it's an async action", policy, index, stepKey); + maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey); + } + } + } }); } else { logger.debug("policy [{}] for index [{}] on an error step after a terminal error, skipping execution", policy, index);