From f37b3c9fb2cd45ee3d3b5f81db3b70455b11fa80 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 9 Jan 2019 16:50:02 -0700 Subject: [PATCH 1/8] Use ILM for Watcher history deletion This commit adds an index lifecycle policy for the `.watch-history-*` indices. This policy is automatically used for all new watch history indices. This does not yet remove the automatic cleanup that the monitoring plugin does for the .watch-history indices, and it does not touch the `xpack.watcher.history.cleaner_service.enabled` setting. Relates to #32041 --- .../indexlifecycle/LifecyclePolicyUtils.java | 76 ++++++++++++++++ .../main/resources/watch-history-policy.json | 10 +++ .../src/main/resources/watch-history.json | 1 + .../action/RestPutLifecycleAction.java | 11 +-- .../action/TransportPutLifecycleAction.java | 7 +- .../qa/native-multi-node-tests/build.gradle | 1 + x-pack/plugin/watcher/build.gradle | 1 + .../elasticsearch/xpack/watcher/Watcher.java | 2 +- .../support/WatcherIndexTemplateRegistry.java | 76 +++++++++++++++- .../WatcherIndexTemplateRegistryTests.java | 87 ++++++++++++++++++- .../AbstractWatcherIntegrationTestCase.java | 3 + 11 files changed, 265 insertions(+), 10 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyUtils.java create mode 100644 x-pack/plugin/core/src/main/resources/watch-history-policy.json diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyUtils.java new file mode 100644 index 0000000000000..c3752e3927b41 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/LifecyclePolicyUtils.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.indexlifecycle; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.NotXContentException; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * A utility class used for loading index lifecycle policies from the resource classpath + */ +public class LifecyclePolicyUtils { + + private LifecyclePolicyUtils() {}; + + /** + * Loads a built-in index lifecycle policy and returns its source. + */ + public static LifecyclePolicy loadPolicy(String name, String resource, NamedXContentRegistry xContentRegistry) { + try { + BytesReference source = load(resource); + validate(source); + + try (XContentParser parser = XContentType.JSON.xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, source.utf8ToString())) { + return LifecyclePolicy.parse(parser, name); + } + } catch (Exception e) { + throw new IllegalArgumentException("unable to load policy [" + name + "] from [" + resource + "]", e); + } + } + + /** + * Loads a resource from the classpath and returns it as a {@link BytesReference} + */ + private static BytesReference load(String name) throws IOException { + try (InputStream is = LifecyclePolicyUtils.class.getResourceAsStream(name)) { + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + Streams.copy(is, out); + return new BytesArray(out.toByteArray()); + } + } + } + + /** + * Parses and validates that the source is not empty. + */ + private static void validate(BytesReference source) { + if (source == null) { + throw new ElasticsearchParseException("policy must not be null"); + } + + try { + XContentHelper.convertToMap(source, false, XContentType.JSON).v2(); + } catch (NotXContentException e) { + throw new ElasticsearchParseException("policy must not be empty"); + } catch (Exception e) { + throw new ElasticsearchParseException("invalid policy", e); + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/watch-history-policy.json b/x-pack/plugin/core/src/main/resources/watch-history-policy.json new file mode 100644 index 0000000000000..a6f4ab34cf150 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/watch-history-policy.json @@ -0,0 +1,10 @@ +{ + "phases": { + "delete": { + "min_age": "30d", + "actions": { + "delete": {} + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/watch-history.json b/x-pack/plugin/core/src/main/resources/watch-history.json index 9a4a96409b043..cd3a116c778c1 100644 --- a/x-pack/plugin/core/src/main/resources/watch-history.json +++ b/x-pack/plugin/core/src/main/resources/watch-history.json @@ -5,6 +5,7 @@ "index.number_of_shards": 1, "index.number_of_replicas": 0, "index.auto_expand_replicas": "0-1", + "index.lifecycle.name": "watch-history-policy", "index.format": 6 }, "mappings": { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/RestPutLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/RestPutLifecycleAction.java index 586c3c683264e..aad85426fc338 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/RestPutLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/RestPutLifecycleAction.java @@ -32,11 +32,12 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String lifecycleName = restRequest.param("name"); - XContentParser parser = restRequest.contentParser(); - PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser); - putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout())); - putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout())); + try (XContentParser parser = restRequest.contentParser()) { + PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser); + putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout())); + putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout())); - return channel -> client.execute(PutLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel)); + return channel -> client.execute(PutLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel)); + } } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/TransportPutLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/TransportPutLifecycleAction.java index da51bb68962d7..61f9be3558fa7 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/TransportPutLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/indexlifecycle/action/TransportPutLifecycleAction.java @@ -87,7 +87,12 @@ public ClusterState execute(ClusterState currentState) throws Exception { SortedMap newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas()); LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(request.getPolicy(), filteredHeaders, nextVersion, Instant.now().toEpochMilli()); - newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata); + LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata); + if (oldPolicy == null) { + logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName()); + } else { + logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName()); + } IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, OperationMode.RUNNING); newState.metaData(MetaData.builder(currentState.getMetaData()) .putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle index 188086afcf14a..95a2f8db2c49c 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle @@ -36,6 +36,7 @@ integTestCluster { dependsOn copyKeyCerts setting 'xpack.security.enabled', 'true' setting 'xpack.ml.enabled', 'true' + setting 'xpack.watcher.enabled', 'false' setting 'logger.org.elasticsearch.xpack.ml.datafeed', 'TRACE' setting 'xpack.monitoring.enabled', 'false' setting 'xpack.security.authc.token.enabled', 'true' diff --git a/x-pack/plugin/watcher/build.gradle b/x-pack/plugin/watcher/build.gradle index fe6885b1c9d72..dd26db984cdf5 100644 --- a/x-pack/plugin/watcher/build.gradle +++ b/x-pack/plugin/watcher/build.gradle @@ -31,6 +31,7 @@ dependencies { compileOnly project(path: ':plugins:transport-nio', configuration: 'runtime') testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') + testCompile "org.elasticsearch.plugin:x-pack-ilm:${version}" // watcher deps compile 'com.googlecode.owasp-java-html-sanitizer:owasp-java-html-sanitizer:r239' diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 7b17d7f9973d1..d5eea54a80c85 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -270,7 +270,7 @@ public Collection createComponents(Client client, ClusterService cluster throw new UncheckedIOException(e); } - new WatcherIndexTemplateRegistry(clusterService, threadPool, client); + new WatcherIndexTemplateRegistry(clusterService, threadPool, client, xContentRegistry); // http client httpClient = new HttpClient(settings, getSslService(), cryptoService, clusterService); diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java index 3258a7a481bb1..d1c12c1fe7479 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java @@ -18,13 +18,20 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.XPackClient; +import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyUtils; +import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction; import org.elasticsearch.xpack.core.template.TemplateUtils; import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField; import java.nio.charset.StandardCharsets; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -46,17 +53,23 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener { TEMPLATE_CONFIG_TRIGGERED_WATCHES, TEMPLATE_CONFIG_WATCH_HISTORY, TEMPLATE_CONFIG_WATCHES }; + public static final PolicyConfig POLICY_WATCH_HISTORY = new PolicyConfig("watch-history-policy", "/watch-history-policy.json"); + private static final Logger logger = LogManager.getLogger(WatcherIndexTemplateRegistry.class); private final Client client; private final ThreadPool threadPool; private final TemplateConfig[] indexTemplates; + private final NamedXContentRegistry xContentRegistry; private final ConcurrentMap templateCreationsInProgress = new ConcurrentHashMap<>(); + private final AtomicBoolean historyPolicyCreationInProgress = new AtomicBoolean(); - public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client) { + public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client, + NamedXContentRegistry xContentRegistry) { this.client = client; this.threadPool = threadPool; this.indexTemplates = TEMPLATE_CONFIGS; + this.xContentRegistry = xContentRegistry; clusterService.addListener(this); } @@ -82,6 +95,7 @@ public void clusterChanged(ClusterChangedEvent event) { if (event.localNodeMaster() || localNodeVersionAfterMaster) { addTemplatesIfMissing(state); + addIndexLifecyclePolicyIfMissing(state); } } @@ -127,6 +141,54 @@ public void onFailure(Exception e) { }); } + // Package visible for testing + LifecyclePolicy loadWatcherHistoryPolicy() { + return LifecyclePolicyUtils.loadPolicy(POLICY_WATCH_HISTORY.policyName, POLICY_WATCH_HISTORY.fileName, xContentRegistry); + } + + private void addIndexLifecyclePolicyIfMissing(ClusterState state) { + if (historyPolicyCreationInProgress.compareAndSet(false, true)) { + final LifecyclePolicy policyOnDisk = loadWatcherHistoryPolicy(); + + Optional maybeMeta = Optional.ofNullable(state.metaData().custom(IndexLifecycleMetadata.TYPE)); + final boolean needsUpdating = maybeMeta + .flatMap(ilmMeta -> Optional.ofNullable(ilmMeta.getPolicies().get(policyOnDisk.getName()))) + // TODO: do we want to leave an existing policy as-is instead of replacing it with the one on disk? + .map(current -> current.equals(policyOnDisk) == false) + .orElse(true); // If there is no policy then one needs to be put; + + if (needsUpdating) { + putPolicy(policyOnDisk, historyPolicyCreationInProgress); + } + } + } + + private void putPolicy(final LifecyclePolicy policy, final AtomicBoolean creationCheck) { + final Executor executor = threadPool.generic(); + executor.execute(() -> { + PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy); + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, request, + new ActionListener() { + @Override + public void onResponse(PutLifecycleAction.Response response) { + creationCheck.set(false); + if (response.isAcknowledged() == false) { + logger.error("error adding watcher index lifecycle policy [{}], request was not acknowledged", + policy.getName()); + } + } + + @Override + public void onFailure(Exception e) { + creationCheck.set(false); + logger.error(new ParameterizedMessage("error adding watcher index lifecycle policy [{}]", + policy.getName()), e); + } + }, (req, listener) -> new XPackClient(client).ilmClient().putLifecyclePolicy(req, listener)); + }); + } + public static boolean validate(ClusterState state) { return state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME) && state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME) && @@ -153,9 +215,19 @@ public String getTemplateName() { public byte[] load() { String template = TemplateUtils.loadTemplate("/" + fileName + ".json", WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION, - Pattern.quote("${xpack.watcher.template.version}")); + Pattern.quote("${xpack.watcher.template.version}")); assert template != null && template.length() > 0; return template.getBytes(StandardCharsets.UTF_8); } } + public static class PolicyConfig { + + private final String policyName; + private String fileName; + + PolicyConfig(String templateName, String fileName) { + this.policyName = templateName; + this.fileName = fileName; + } + } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java index e93a86b93eb23..e2a25e9ccf6b9 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -21,20 +22,42 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.indexlifecycle.AllocateAction; +import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction; +import org.elasticsearch.xpack.core.indexlifecycle.ForceMergeAction; +import org.elasticsearch.xpack.core.indexlifecycle.FreezeAction; +import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction; +import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction; +import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction; +import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction; +import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction; import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField; import org.junit.Before; import org.mockito.ArgumentCaptor; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.elasticsearch.mock.orig.Mockito.verify; import static org.elasticsearch.mock.orig.Mockito.when; @@ -50,6 +73,7 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase { private WatcherIndexTemplateRegistry registry; + private NamedXContentRegistry xContentRegistry; private Client client; @Before @@ -72,7 +96,20 @@ public void createRegistryAndClient() { }).when(indicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class), any(ActionListener.class)); ClusterService clusterService = mock(ClusterService.class); - registry = new WatcherIndexTemplateRegistry(clusterService, threadPool, client); + List entries = new ArrayList<>(ClusterModule.getNamedXWriteables()); + entries.addAll(Arrays.asList( + new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE), + (p) -> TimeseriesLifecycleType.INSTANCE), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse) + )); + xContentRegistry = new NamedXContentRegistry(entries); + registry = new WatcherIndexTemplateRegistry(clusterService, threadPool, client, xContentRegistry); } public void testThatNonExistingTemplatesAreAddedImmediately() { @@ -91,6 +128,44 @@ public void testThatNonExistingTemplatesAreAddedImmediately() { verify(client.admin().indices(), times(4)).putTemplate(argumentCaptor.capture(), anyObject()); } + public void testThatNonExistingPoliciesAreAddedImmediately() { + DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), nodes); + registry.clusterChanged(event); + verify(client, times(1)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject()); + } + + public void testPolicyAlreadyExists() { + DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + Map policyMap = new HashMap<>(); + LifecyclePolicy policy = registry.loadWatcherHistoryPolicy(); + policyMap.put(policy.getName(), policy); + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), policyMap, nodes); + registry.clusterChanged(event); + verify(client, times(0)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject()); + } + + public void testPolicyAlreadyExistsButDiffers() throws IOException { + DiscoveryNode node = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + + Map policyMap = new HashMap<>(); + String policyStr = "{\"phases\":{\"delete\":{\"min_age\":\"1m\",\"actions\":{\"delete\":{}}}}}"; + LifecyclePolicy policy = registry.loadWatcherHistoryPolicy(); + try (XContentParser parser = XContentType.JSON.xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, policyStr)) { + LifecyclePolicy different = LifecyclePolicy.parse(parser, policy.getName()); + policyMap.put(policy.getName(), different); + ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), policyMap, nodes); + registry.clusterChanged(event); + verify(client, times(1)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject()); + } + } + public void testThatTemplatesExist() { assertThat(WatcherIndexTemplateRegistry.validate(createClusterState(".watch-history")), is(false)); assertThat(WatcherIndexTemplateRegistry.validate(createClusterState(".watch-history", ".triggered_watches", ".watches")), @@ -141,6 +216,12 @@ public void testThatMissingMasterNodeDoesNothing() { } private ClusterChangedEvent createClusterChangedEvent(List existingTemplateNames, DiscoveryNodes nodes) { + return createClusterChangedEvent(existingTemplateNames, Collections.emptyMap(), nodes); + } + + private ClusterChangedEvent createClusterChangedEvent(List existingTemplateNames, + Map existingPolicies, + DiscoveryNodes nodes) { ClusterChangedEvent event = mock(ClusterChangedEvent.class); when(event.localNodeMaster()).thenReturn(nodes.isLocalNodeElectedMaster()); ClusterState cs = mock(ClusterState.class); @@ -158,6 +239,10 @@ private ClusterChangedEvent createClusterChangedEvent(List existingTempl } when(metaData.getTemplates()).thenReturn(indexTemplates.build()); + + IndexLifecycleMetadata ilmMeta = mock(IndexLifecycleMetadata.class); + when(ilmMeta.getPolicies()).thenReturn(existingPolicies); + when(metaData.custom(anyObject())).thenReturn(ilmMeta); when(cs.metaData()).thenReturn(metaData); return event; diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 615fc8fbd08f9..6737de19baa13 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -49,6 +49,7 @@ import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; import org.elasticsearch.xpack.core.watcher.watch.ClockMock; import org.elasticsearch.xpack.core.watcher.watch.Watch; +import org.elasticsearch.xpack.indexlifecycle.IndexLifecycle; import org.elasticsearch.xpack.watcher.history.HistoryStore; import org.elasticsearch.xpack.watcher.notification.email.Authentication; import org.elasticsearch.xpack.watcher.notification.email.Email; @@ -161,6 +162,8 @@ protected List> pluginTypes() { } types.add(CommonAnalysisPlugin.class); + // ILM is required for watcher template index settings + types.add(IndexLifecycle.class); return types; } From 59b7ab74071007ee8359cfa1ea49409fea3c2d03 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 15 Jan 2019 09:50:40 -0700 Subject: [PATCH 2/8] Resolve TODO, if an existing policy exists, don't overwrite it --- .../xpack/watcher/support/WatcherIndexTemplateRegistry.java | 4 +--- .../watcher/support/WatcherIndexTemplateRegistryTests.java | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java index d1c12c1fe7479..b8ed7a071624d 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java @@ -153,9 +153,7 @@ private void addIndexLifecyclePolicyIfMissing(ClusterState state) { Optional maybeMeta = Optional.ofNullable(state.metaData().custom(IndexLifecycleMetadata.TYPE)); final boolean needsUpdating = maybeMeta .flatMap(ilmMeta -> Optional.ofNullable(ilmMeta.getPolicies().get(policyOnDisk.getName()))) - // TODO: do we want to leave an existing policy as-is instead of replacing it with the one on disk? - .map(current -> current.equals(policyOnDisk) == false) - .orElse(true); // If there is no policy then one needs to be put; + .isPresent() == false; // If there is no policy then one needs to be put; if (needsUpdating) { putPolicy(policyOnDisk, historyPolicyCreationInProgress); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java index e2a25e9ccf6b9..72e6429a9064e 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java @@ -162,7 +162,7 @@ public void testPolicyAlreadyExistsButDiffers() throws IOException { policyMap.put(policy.getName(), different); ClusterChangedEvent event = createClusterChangedEvent(Collections.emptyList(), policyMap, nodes); registry.clusterChanged(event); - verify(client, times(1)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject()); + verify(client, times(0)).execute(eq(PutLifecycleAction.INSTANCE), anyObject(), anyObject()); } } From 474b5c3e099a2fbb68848a321253f54ac3f0d5f9 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 17 Jan 2019 08:59:56 -0700 Subject: [PATCH 3/8] Rename policy to watch-history-ilm-policy --- ...{watch-history-policy.json => watch-history-ilm-policy.json} | 0 x-pack/plugin/core/src/main/resources/watch-history.json | 2 +- .../xpack/watcher/support/WatcherIndexTemplateRegistry.java | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename x-pack/plugin/core/src/main/resources/{watch-history-policy.json => watch-history-ilm-policy.json} (100%) diff --git a/x-pack/plugin/core/src/main/resources/watch-history-policy.json b/x-pack/plugin/core/src/main/resources/watch-history-ilm-policy.json similarity index 100% rename from x-pack/plugin/core/src/main/resources/watch-history-policy.json rename to x-pack/plugin/core/src/main/resources/watch-history-ilm-policy.json diff --git a/x-pack/plugin/core/src/main/resources/watch-history.json b/x-pack/plugin/core/src/main/resources/watch-history.json index cd3a116c778c1..db6fd4aff950a 100644 --- a/x-pack/plugin/core/src/main/resources/watch-history.json +++ b/x-pack/plugin/core/src/main/resources/watch-history.json @@ -5,7 +5,7 @@ "index.number_of_shards": 1, "index.number_of_replicas": 0, "index.auto_expand_replicas": "0-1", - "index.lifecycle.name": "watch-history-policy", + "index.lifecycle.name": "watch-history-ilm-policy", "index.format": 6 }, "mappings": { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java index b8ed7a071624d..415fa0770b0a9 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java @@ -53,7 +53,7 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener { TEMPLATE_CONFIG_TRIGGERED_WATCHES, TEMPLATE_CONFIG_WATCH_HISTORY, TEMPLATE_CONFIG_WATCHES }; - public static final PolicyConfig POLICY_WATCH_HISTORY = new PolicyConfig("watch-history-policy", "/watch-history-policy.json"); + public static final PolicyConfig POLICY_WATCH_HISTORY = new PolicyConfig("watch-history-ilm-policy", "/watch-history-ilm-policy.json"); private static final Logger logger = LogManager.getLogger(WatcherIndexTemplateRegistry.class); From 9aa2f06841cd7d6d147f20cc361c37df81689594 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 17 Jan 2019 09:00:10 -0700 Subject: [PATCH 4/8] Remove unnecessary NamedXContentRegistry entries --- .../WatcherIndexTemplateRegistryTests.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java index 72e6429a9064e..60ca2b83b2f85 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistryTests.java @@ -34,17 +34,11 @@ import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.indexlifecycle.AllocateAction; import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction; -import org.elasticsearch.xpack.core.indexlifecycle.ForceMergeAction; -import org.elasticsearch.xpack.core.indexlifecycle.FreezeAction; import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType; -import org.elasticsearch.xpack.core.indexlifecycle.ReadOnlyAction; -import org.elasticsearch.xpack.core.indexlifecycle.RolloverAction; -import org.elasticsearch.xpack.core.indexlifecycle.ShrinkAction; import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction; import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField; @@ -100,14 +94,7 @@ public void createRegistryAndClient() { entries.addAll(Arrays.asList( new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE), (p) -> TimeseriesLifecycleType.INSTANCE), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), - new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse) - )); + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse))); xContentRegistry = new NamedXContentRegistry(entries); registry = new WatcherIndexTemplateRegistry(clusterService, threadPool, client, xContentRegistry); } From bbcfdb9d25d360a41845dcc261d5c009f597093c Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 17 Jan 2019 09:00:27 -0700 Subject: [PATCH 5/8] Unset atomicboolean in else clause --- .../xpack/watcher/support/WatcherIndexTemplateRegistry.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java index 415fa0770b0a9..0fdb2b3a17d13 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/WatcherIndexTemplateRegistry.java @@ -157,6 +157,8 @@ private void addIndexLifecyclePolicyIfMissing(ClusterState state) { if (needsUpdating) { putPolicy(policyOnDisk, historyPolicyCreationInProgress); + } else { + historyPolicyCreationInProgress.set(false); } } } From f51ca5da1eb930b1dc749679fcec9933c3f0ac27 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 17 Jan 2019 10:50:31 -0700 Subject: [PATCH 6/8] Fix doc test that retrieved all policies instead of specific one --- docs/reference/ilm/apis/get-lifecycle.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/ilm/apis/get-lifecycle.asciidoc b/docs/reference/ilm/apis/get-lifecycle.asciidoc index 9bdf14d970caa..a36703967af67 100644 --- a/docs/reference/ilm/apis/get-lifecycle.asciidoc +++ b/docs/reference/ilm/apis/get-lifecycle.asciidoc @@ -69,7 +69,7 @@ PUT _ilm/policy/my_policy [source,js] -------------------------------------------------- -GET _ilm/policy +GET _ilm/policy/my_policy -------------------------------------------------- // CONSOLE // TEST[continued] From 87410dabebba9599e8acf2f15cf3162fe9120648 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 17 Jan 2019 11:30:13 -0700 Subject: [PATCH 7/8] Fix another doc test --- docs/reference/ilm/update-lifecycle-policy.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/ilm/update-lifecycle-policy.asciidoc b/docs/reference/ilm/update-lifecycle-policy.asciidoc index da3983d053c59..27869008d55ac 100644 --- a/docs/reference/ilm/update-lifecycle-policy.asciidoc +++ b/docs/reference/ilm/update-lifecycle-policy.asciidoc @@ -87,7 +87,7 @@ PUT _ilm/policy/my_policy ////////// [source,js] -------------------------------------------------- -GET _ilm/policy +GET _ilm/policy/my_policy -------------------------------------------------- // CONSOLE // TEST[continued] From 31c69f5c86ec810b4a00c95c5e1ad795a6093f3b Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 22 Jan 2019 16:22:52 -0700 Subject: [PATCH 8/8] Move from 30d -> 7d retention --- .../core/src/main/resources/watch-history-ilm-policy.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/resources/watch-history-ilm-policy.json b/x-pack/plugin/core/src/main/resources/watch-history-ilm-policy.json index a6f4ab34cf150..e45e6b25e8f7b 100644 --- a/x-pack/plugin/core/src/main/resources/watch-history-ilm-policy.json +++ b/x-pack/plugin/core/src/main/resources/watch-history-ilm-policy.json @@ -1,7 +1,7 @@ { "phases": { "delete": { - "min_age": "30d", + "min_age": "7d", "actions": { "delete": {} }