Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ssl settings to reindex from remote #37527

Merged
merged 13 commits into from
Jan 31, 2019
Merged
6 changes: 6 additions & 0 deletions modules/reindex/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ unitTest {

dependencies {
compile "org.elasticsearch.client:elasticsearch-rest-client:${version}"
compile project(':libs:elasticsearch-ssl-config')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

register the ssl-config project in the top-level build.gradle under projectSubstitutions, and then you can use compile "org.elasticsearch:elasticsearch-ssl-config:${version}"
Also check if the ssl-config is properly published through the release manager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ywelsch, will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The build.gradle change has been pushed, and there is an open PR for release manager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually @ywelsch Is there a reason you think this should be in release-manager? I don't think we publish any reindex jars that would need this.
As an example we don't publish the grok libs that are used in ingest modules.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right. No need for that.

// for http - testing reindex from remote
testCompile project(path: ':modules:transport-netty4', configuration: 'runtime')
// for parent/child testing
Expand All @@ -71,6 +72,11 @@ thirdPartyAudit.ignoreMissingClasses (
'org.apache.log.Logger',
)

forbiddenPatterns {
// PKCS#12 file are not UTF-8
exclude '**/*.p12'
}

// Support for testing reindex-from-remote against old Elaticsearch versions
configurations {
oldesFixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.action.bulk.Retry;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -82,14 +83,16 @@
* Abstract base for scrolling across a search and executing bulk actions on all results. All package private methods are package private so
* their tests can use them. Most methods run in the listener thread pool because the are meant to be fast and don't expect to block.
*/
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>> {
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>,
Action extends TransportAction<Request, ?>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduction of the Action as a generic type parameter and field (mainAction below) is an attempt to solve the problems that come from the way AbstractAsyncBulkByScrollAction calls buildScrollableResultSource from its constructor.
Because the call is made from the superclass's constructor, it means that TransportReindexAction.AsyncIndexBySearchAction.buildScrollableResultSource (and by extension buildRestClient) can't make use of any fields in the subclass.
But buildRestClient needs access to the ReindexSslConfig instance, so we either need to push that up into the base class, or come up with a different approach.
By pushing a generic mainAction up into the base class, we can access fields from the Action without needing to duplicate them in AbstractAsyncBulkByScrollAction. This allows us to also remove scriptService from this class.


protected final Logger logger;
protected final BulkByScrollTask task;
protected final WorkerBulkByScrollTaskState worker;
protected final ThreadPool threadPool;
protected final ScriptService scriptService;
protected final ClusterState clusterState;

protected final Action mainAction;
/**
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
* requests of this mainRequest.
Expand All @@ -112,7 +115,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;

public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, ScriptService scriptService, ClusterState clusterState,
ThreadPool threadPool, Action mainAction, Request mainRequest, ClusterState clusterState,
ActionListener<BulkByScrollResponse> listener) {

this.task = task;
Expand All @@ -124,8 +127,8 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, Logger logger, Par
this.logger = logger;
this.client = client;
this.threadPool = threadPool;
this.scriptService = scriptService;
this.clusterState = clusterState;
this.mainAction = mainAction;
this.mainRequest = mainRequest;
this.listener = listener;
BackoffPolicy backoffPolicy = buildBackoffPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
/**
* Implementation of delete-by-query using scrolling and bulk.
*/
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest> {
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {
public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
ScriptService scriptService, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
super(task, logger, client, threadPool, action, request, clusterState, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,32 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

Expand Down Expand Up @@ -69,8 +80,19 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestRethrottleAction(settings, restController, nodesInCluster));
}

@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
return Collections.singletonList(new ReindexSslConfig(environment.settings(), environment, resourceWatcherService));
}

@Override
public List<Setting<?>> getSettings() {
return singletonList(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
final List<Setting<?>> settings = new ArrayList<>();
settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
settings.addAll(ReindexSslConfig.getSettings());
return settings;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.reindex;

import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureSetting;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.ssl.SslConfiguration;
import org.elasticsearch.common.ssl.SslConfigurationKeys;
import org.elasticsearch.common.ssl.SslConfigurationLoader;
import org.elasticsearch.env.Environment;
import org.elasticsearch.watcher.FileChangesListener;
import org.elasticsearch.watcher.FileWatcher;
import org.elasticsearch.watcher.ResourceWatcherService;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.elasticsearch.common.settings.Setting.listSetting;
import static org.elasticsearch.common.settings.Setting.simpleString;

/**
* Loads "reindex.ssl.*" configuration from Settings, and makes the applicable configuration (trust manager / key manager / hostname
* verification / cipher-suites) available for reindex-from-remote.
*/
class ReindexSslConfig {

private static final Map<String, Setting<?>> SETTINGS = new HashMap<>();
private static final Map<String, Setting<SecureString>> SECURE_SETTINGS = new HashMap<>();

static {
Setting.Property[] defaultProperties = new Setting.Property[] { Setting.Property.NodeScope, Setting.Property.Filtered };
Setting.Property[] deprecatedProperties = new Setting.Property[] { Setting.Property.Deprecated, Setting.Property.NodeScope,
Setting.Property.Filtered };
for (String key : SslConfigurationKeys.getStringKeys()) {
String settingName = "reindex.ssl." + key;
final Setting.Property[] properties = SslConfigurationKeys.isDeprecated(key) ? deprecatedProperties : defaultProperties;
SETTINGS.put(settingName, simpleString(settingName, properties));
}
for (String key : SslConfigurationKeys.getListKeys()) {
String settingName = "reindex.ssl." + key;
final Setting.Property[] properties = SslConfigurationKeys.isDeprecated(key) ? deprecatedProperties : defaultProperties;
SETTINGS.put(settingName, listSetting(settingName, Collections.emptyList(), Function.identity(), properties));
}
for (String key : SslConfigurationKeys.getSecureStringKeys()) {
String settingName = "reindex.ssl." + key;
SECURE_SETTINGS.put(settingName, SecureSetting.secureString(settingName, null));
}
}

private final SslConfiguration configuration;
private volatile SSLContext context;

public static List<Setting<?>> getSettings() {
List<Setting<?>> settings = new ArrayList<>();
settings.addAll(SETTINGS.values());
settings.addAll(SECURE_SETTINGS.values());
return settings;
}

ReindexSslConfig(Settings settings, Environment environment, ResourceWatcherService resourceWatcher) {
final SslConfigurationLoader loader = new SslConfigurationLoader("reindex.ssl.") {

@Override
protected String getSettingAsString(String key) {
return settings.get(key);
}

@Override
protected char[] getSecureSetting(String key) {
final Setting<SecureString> setting = SECURE_SETTINGS.get(key);
if (setting == null) {
throw new IllegalArgumentException("The secure setting [" + key + "] is not registered");
}
return setting.get(settings).getChars();
}

@Override
protected List<String> getSettingAsList(String key) throws Exception {
return settings.getAsList(key);
}
};
configuration = loader.load(environment.configFile());
reload();

final FileChangesListener listener = new FileChangesListener() {
@Override
public void onFileCreated(Path file) {
onFileChanged(file);
}

@Override
public void onFileDeleted(Path file) {
onFileChanged(file);
}

@Override
public void onFileChanged(Path file) {
ReindexSslConfig.this.reload();
}
};
for (Path file : configuration.getDependentFiles()) {
try {
final FileWatcher watcher = new FileWatcher(file);
watcher.addListener(listener);
resourceWatcher.add(watcher, ResourceWatcherService.Frequency.HIGH);
} catch (IOException e) {
throw new UncheckedIOException("cannot watch file [" + file + "]", e);
}
}
}

private void reload() {
this.context = configuration.createSslContext();
}

/**
* Encapsulate the loaded SSL configuration as a HTTP-client {@link SSLIOSessionStrategy}.
* The returned strategy is immutable, but successive calls will return different objects that may have different
* configurations if the underlying key/certificate files are modified.
*/
SSLIOSessionStrategy getStrategy() {
final HostnameVerifier hostnameVerifier = configuration.getVerificationMode().isHostnameVerificationEnabled()
? new DefaultHostnameVerifier()
: new NoopHostnameVerifier();
final String[] protocols = configuration.getSupportedProtocols().toArray(Strings.EMPTY_ARRAY);
final String[] cipherSuites = configuration.getCipherSuites().toArray(Strings.EMPTY_ARRAY);
return new SSLIOSessionStrategy(context, protocols, cipherSuites, hostnameVerifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<Bu
ClusterState state = clusterService.state();
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
bulkByScrollTask);
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, request, scriptService, state,
new AsyncDeleteByQueryAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, scriptService, state,
listener).start();
}
);
Expand Down
Loading