Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ILM for Watcher history deletion #37443

Merged
merged 10 commits into from
Jan 23, 2019
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.indexlifecycle;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;

/**
* A utility class used for loading index lifecycle policies from the resource classpath
*/
public class LifecyclePolicyUtils {

private LifecyclePolicyUtils() {};

/**
* Loads a built-in index lifecycle policy and returns its source.
*/
public static LifecyclePolicy loadPolicy(String name, String resource, NamedXContentRegistry xContentRegistry) {
try {
BytesReference source = load(resource);
validate(source);

try (XContentParser parser = XContentType.JSON.xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.THROW_UNSUPPORTED_OPERATION, source.utf8ToString())) {
return LifecyclePolicy.parse(parser, name);
}
} catch (Exception e) {
throw new IllegalArgumentException("unable to load policy [" + name + "] from [" + resource + "]", e);
}
}

/**
* Loads a resource from the classpath and returns it as a {@link BytesReference}
*/
private static BytesReference load(String name) throws IOException {
dakrone marked this conversation as resolved.
Show resolved Hide resolved
try (InputStream is = LifecyclePolicyUtils.class.getResourceAsStream(name)) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
Streams.copy(is, out);
return new BytesArray(out.toByteArray());
}
}
}

/**
* Parses and validates that the source is not empty.
*/
private static void validate(BytesReference source) {
if (source == null) {
throw new ElasticsearchParseException("policy must not be null");
}

try {
XContentHelper.convertToMap(source, false, XContentType.JSON).v2();
} catch (NotXContentException e) {
throw new ElasticsearchParseException("policy must not be empty");
} catch (Exception e) {
throw new ElasticsearchParseException("invalid policy", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"phases": {
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
1 change: 1 addition & 0 deletions x-pack/plugin/core/src/main/resources/watch-history.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"index.number_of_shards": 1,
"index.number_of_replicas": 0,
"index.auto_expand_replicas": "0-1",
"index.lifecycle.name": "watch-history-ilm-policy",
"index.format": 6
},
"mappings": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String lifecycleName = restRequest.param("name");
XContentParser parser = restRequest.contentParser();
PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser);
putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout()));
putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout()));
try (XContentParser parser = restRequest.contentParser()) {
PutLifecycleAction.Request putLifecycleRequest = PutLifecycleAction.Request.parseRequest(lifecycleName, parser);
putLifecycleRequest.timeout(restRequest.paramAsTime("timeout", putLifecycleRequest.timeout()));
putLifecycleRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", putLifecycleRequest.masterNodeTimeout()));

return channel -> client.execute(PutLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel));
return channel -> client.execute(PutLifecycleAction.INSTANCE, putLifecycleRequest, new RestToXContentListener<>(channel));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ public ClusterState execute(ClusterState currentState) throws Exception {
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(request.getPolicy(), filteredHeaders,
nextVersion, Instant.now().toEpochMilli());
newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
if (oldPolicy == null) {
logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
} else {
logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
dakrone marked this conversation as resolved.
Show resolved Hide resolved
}
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, OperationMode.RUNNING);
newState.metaData(MetaData.builder(currentState.getMetaData())
.putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ integTestCluster {
dependsOn copyKeyCerts
setting 'xpack.security.enabled', 'true'
setting 'xpack.ml.enabled', 'true'
setting 'xpack.watcher.enabled', 'false'
setting 'logger.org.elasticsearch.xpack.ml.datafeed', 'TRACE'
setting 'xpack.monitoring.enabled', 'false'
setting 'xpack.security.authc.token.enabled', 'true'
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/watcher/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies {
compileOnly project(path: ':plugins:transport-nio', configuration: 'runtime')

testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile "org.elasticsearch.plugin:x-pack-ilm:${version}"

// watcher deps
compile 'com.googlecode.owasp-java-html-sanitizer:owasp-java-html-sanitizer:r239'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
throw new UncheckedIOException(e);
}

new WatcherIndexTemplateRegistry(clusterService, threadPool, client);
new WatcherIndexTemplateRegistry(clusterService, threadPool, client, xContentRegistry);

// http client
httpClient = new HttpClient(settings, getSslService(), cryptoService, clusterService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyUtils;
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField;

import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
Expand All @@ -46,17 +53,23 @@ public class WatcherIndexTemplateRegistry implements ClusterStateListener {
TEMPLATE_CONFIG_TRIGGERED_WATCHES, TEMPLATE_CONFIG_WATCH_HISTORY, TEMPLATE_CONFIG_WATCHES
};

public static final PolicyConfig POLICY_WATCH_HISTORY = new PolicyConfig("watch-history-ilm-policy", "/watch-history-ilm-policy.json");

private static final Logger logger = LogManager.getLogger(WatcherIndexTemplateRegistry.class);

private final Client client;
private final ThreadPool threadPool;
private final TemplateConfig[] indexTemplates;
private final NamedXContentRegistry xContentRegistry;
private final ConcurrentMap<String, AtomicBoolean> templateCreationsInProgress = new ConcurrentHashMap<>();
private final AtomicBoolean historyPolicyCreationInProgress = new AtomicBoolean();

public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client) {
public WatcherIndexTemplateRegistry(ClusterService clusterService, ThreadPool threadPool, Client client,
NamedXContentRegistry xContentRegistry) {
this.client = client;
this.threadPool = threadPool;
this.indexTemplates = TEMPLATE_CONFIGS;
this.xContentRegistry = xContentRegistry;
clusterService.addListener(this);
}

Expand All @@ -82,6 +95,7 @@ public void clusterChanged(ClusterChangedEvent event) {

if (event.localNodeMaster() || localNodeVersionAfterMaster) {
addTemplatesIfMissing(state);
addIndexLifecyclePolicyIfMissing(state);
dakrone marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -127,6 +141,54 @@ public void onFailure(Exception e) {
});
}

// Package visible for testing
LifecyclePolicy loadWatcherHistoryPolicy() {
return LifecyclePolicyUtils.loadPolicy(POLICY_WATCH_HISTORY.policyName, POLICY_WATCH_HISTORY.fileName, xContentRegistry);
}

private void addIndexLifecyclePolicyIfMissing(ClusterState state) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any concerns about multiple nodes entering this path and putting the policy multiple times ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this only happens when the local node is master, it should only get called once with the atomic compareAndSet below. Also any subsequent calls would see that the policy already existed and not replace it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh.. thanks, I missed this will only run on the master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: it is not only the master node, but is also put when this node is newer than the master node. The reason for this is, that with regards to the watcher history, a node that is not the master node might be updated first in a cluster, and that one could write the watch history in a format which requires a new watch history template.

I am not sure if this applies to ILM as well, as this is an administrative task compared to the fact that the watch history is written by non master nodes.

if (historyPolicyCreationInProgress.compareAndSet(false, true)) {
final LifecyclePolicy policyOnDisk = loadWatcherHistoryPolicy();

Optional<IndexLifecycleMetadata> maybeMeta = Optional.ofNullable(state.metaData().custom(IndexLifecycleMetadata.TYPE));
final boolean needsUpdating = maybeMeta
.flatMap(ilmMeta -> Optional.ofNullable(ilmMeta.getPolicies().get(policyOnDisk.getName())))
.isPresent() == false; // If there is no policy then one needs to be put;

if (needsUpdating) {
putPolicy(policyOnDisk, historyPolicyCreationInProgress);
} else {
historyPolicyCreationInProgress.set(false);
}
dakrone marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void putPolicy(final LifecyclePolicy policy, final AtomicBoolean creationCheck) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, request,
new ActionListener<PutLifecycleAction.Response>() {
@Override
public void onResponse(PutLifecycleAction.Response response) {
creationCheck.set(false);
if (response.isAcknowledged() == false) {
logger.error("error adding watcher index lifecycle policy [{}], request was not acknowledged",
policy.getName());
}
}

@Override
public void onFailure(Exception e) {
creationCheck.set(false);
logger.error(new ParameterizedMessage("error adding watcher index lifecycle policy [{}]",
policy.getName()), e);
}
}, (req, listener) -> new XPackClient(client).ilmClient().putLifecyclePolicy(req, listener));
});
}

public static boolean validate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME) &&
state.getMetaData().getTemplates().containsKey(WatcherIndexTemplateRegistryField.TRIGGERED_TEMPLATE_NAME) &&
Expand All @@ -153,9 +215,19 @@ public String getTemplateName() {

public byte[] load() {
String template = TemplateUtils.loadTemplate("/" + fileName + ".json", WatcherIndexTemplateRegistryField.INDEX_TEMPLATE_VERSION,
Pattern.quote("${xpack.watcher.template.version}"));
Pattern.quote("${xpack.watcher.template.version}"));
assert template != null && template.length() > 0;
return template.getBytes(StandardCharsets.UTF_8);
}
}
public static class PolicyConfig {

private final String policyName;
private String fileName;

PolicyConfig(String templateName, String fileName) {
this.policyName = templateName;
this.fileName = fileName;
}
}
}
Loading