Skip to content

Commit

Permalink
Make BlobStoreRepository Aware of ClusterState
Browse files Browse the repository at this point in the history
This is a preliminary to elastic#49060.

It does not introduce any substantial behavior change to how the blob store repository
operates. What it does is to add all the infrastructure changes around passing the cluster service
to the blob store, associated test changes and a best effort approach to tracking the latest repository
generation on all nodes from cluster state updates. This brings a slight improvement to the consistency
by which non-master nodes (or master directly after a failover) will be able to determine the latest
repository generation. It does not however do any tricky checks for the situation after a repository operation
(create, delete or cleanup) that could theoretically be used to get even greater accuracy to keep this change
simple.
This change does not in any way alter the behavior of the blobstore repository other than adding a better "guess"
for the value of the latest repo generation and is mainly intended to isolate the actual logical change to how the
repository operates in elastic#49060
  • Loading branch information
original-brownbear committed Nov 27, 2019
1 parent b696c92 commit 62ecb71
Show file tree
Hide file tree
Showing 36 changed files with 264 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

package org.elasticsearch.plugin.repository.url;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.url.URLRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -46,8 +46,8 @@ public List<Setting<?>> getSettings() {

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.singletonMap(URLRepository.TYPE,
metadata -> new URLRepository(metadata, env, namedXContentRegistry, threadPool));
metadata -> new URLRepository(metadata, env, namedXContentRegistry, clusterService));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
Expand All @@ -33,7 +34,6 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.net.MalformedURLException;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -81,8 +81,8 @@ public class URLRepository extends BlobStoreRepository {
* Constructs a read-only URL-based repository
*/
public URLRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath());
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) {
super(metadata, namedXContentRegistry, clusterService, BlobPath.cleanPath());

if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
throw new RepositoryException(metadata.name(), "missing url");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
Expand All @@ -35,13 +35,12 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.mockito.Mockito.mock;

public class URLRepositoryTests extends ESTestCase {

private URLRepository createRepository(Settings baseSettings, RepositoryMetaData repositoryMetaData) {
return new URLRepository(repositoryMetaData, TestEnvironment.newEnvironment(baseSettings),
new NamedXContentRegistry(Collections.emptyList()), mock(ThreadPool.class)) {
new NamedXContentRegistry(Collections.emptyList()), BlobStoreTestUtil.mockClusterService()) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
Expand All @@ -32,7 +33,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Locale;
import java.util.function.Function;
Expand Down Expand Up @@ -79,8 +79,8 @@ public AzureRepository(
final RepositoryMetaData metadata,
final NamedXContentRegistry namedXContentRegistry,
final AzureStorageService storageService,
final ThreadPool threadPool) {
super(metadata, namedXContentRegistry, threadPool, buildBasePath(metadata));
final ClusterService clusterService) {
super(metadata, namedXContentRegistry, clusterService, buildBasePath(metadata));
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.storageService = storageService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.repositories.azure;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
Expand All @@ -31,7 +32,6 @@
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -60,9 +60,9 @@ AzureStorageService createAzureStoreService(final Settings settings) {

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.singletonMap(AzureRepository.TYPE,
(metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, threadPool));
(metadata) -> new AzureRepository(metadata, namedXContentRegistry, azureStoreService, clusterService));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.microsoft.azure.storage.LocationMode;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -32,6 +34,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class AzureRepositorySettingsTests extends ESTestCase {

Expand All @@ -41,9 +44,13 @@ private AzureRepository azureRepository(Settings settings) {
.putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths())
.put(settings)
.build();
final ThreadPool threadPool = mock(ThreadPool.class);
final ClusterService clusterService = mock(ClusterService.class);
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
when(clusterApplierService.threadPool()).thenReturn(threadPool);
final AzureRepository azureRepository = new AzureRepository(new RepositoryMetaData("foo", "azure", internalSettings),
NamedXContentRegistry.EMPTY, mock(AzureStorageService.class),
mock(ThreadPool.class));
NamedXContentRegistry.EMPTY, mock(AzureStorageService.class), clusterService);
assertThat(azureRepository.getBlobStore(), is(nullValue()));
return azureRepository;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.repositories.gcs;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -27,7 +28,6 @@
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -52,9 +52,9 @@ protected GoogleCloudStorageService createStorageService() {

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
ClusterService clusterService) {
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, threadPool));
metadata -> new GoogleCloudStorageRepository(metadata, namedXContentRegistry, this.storageService, clusterService));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -30,7 +31,6 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.function.Function;

Expand Down Expand Up @@ -64,8 +64,8 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
final RepositoryMetaData metadata,
final NamedXContentRegistry namedXContentRegistry,
final GoogleCloudStorageService storageService,
final ThreadPool threadPool) {
super(metadata, namedXContentRegistry, threadPool, buildBasePath(metadata));
final ClusterService clusterService) {
super(metadata, namedXContentRegistry, clusterService, buildBasePath(metadata));
this.storageService = storageService;

this.chunkSize = getSetting(CHUNK_SIZE, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import fixture.gcs.FakeOAuth2HttpHandler;
import fixture.gcs.GoogleCloudStorageHttpHandler;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.MockSecureSettings;
Expand All @@ -38,7 +39,6 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.threeten.bp.Duration;

import java.io.IOException;
Expand Down Expand Up @@ -170,9 +170,10 @@ StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clien
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry registry, ThreadPool threadPool) {
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry registry,
ClusterService clusterService) {
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, threadPool) {
metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, clusterService) {
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore("bucket", "test", storageService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.SecurityUtil;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;

public final class HdfsPlugin extends Plugin implements RepositoryPlugin {

Expand Down Expand Up @@ -112,7 +112,7 @@ private static Void eagerInit() {

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ThreadPool threadPool) {
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, threadPool));
ClusterService clusterService) {
return Collections.singletonMap("hdfs", (metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, clusterService));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -40,7 +41,6 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -67,8 +67,8 @@ public final class HdfsRepository extends BlobStoreRepository {
private static final ByteSizeValue DEFAULT_BUFFER_SIZE = new ByteSizeValue(100, ByteSizeUnit.KB);

public HdfsRepository(RepositoryMetaData metadata, Environment environment,
NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool) {
super(metadata, namedXContentRegistry, threadPool, BlobPath.cleanPath());
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService) {
super(metadata, namedXContentRegistry, clusterService, BlobPath.cleanPath());

this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
Expand All @@ -32,7 +33,6 @@
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.function.Function;

Expand Down Expand Up @@ -152,8 +152,8 @@ class S3Repository extends BlobStoreRepository {
final RepositoryMetaData metadata,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service,
final ThreadPool threadPool) {
super(metadata, namedXContentRegistry, threadPool, buildBasePath(metadata));
final ClusterService clusterService) {
super(metadata, namedXContentRegistry, clusterService, buildBasePath(metadata));
this.service = service;

// Parse and validate the user's S3 Storage Class setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.amazonaws.util.json.Jackson;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -30,7 +31,6 @@
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.security.AccessController;
Expand Down Expand Up @@ -79,14 +79,14 @@ public S3RepositoryPlugin(final Settings settings) {
protected S3Repository createRepository(
final RepositoryMetaData metadata,
final NamedXContentRegistry registry,
final ThreadPool threadPool) {
return new S3Repository(metadata, registry, service, threadPool);
final ClusterService clusterService) {
return new S3Repository(metadata, registry, service, clusterService);
}

@Override
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry,
final ThreadPool threadPool) {
return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, threadPool));
final ClusterService clusterService) {
return Collections.singletonMap(S3Repository.TYPE, metadata -> createRepository(metadata, registry, clusterService));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -146,8 +146,8 @@ public ProxyS3RepositoryPlugin(Settings settings) {

@Override
protected S3Repository createRepository(RepositoryMetaData metadata,
NamedXContentRegistry registry, ThreadPool threadPool) {
return new S3Repository(metadata, registry, service, threadPool) {
NamedXContentRegistry registry, ClusterService clusterService) {
return new S3Repository(metadata, registry, service, clusterService) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
Expand Down
Loading

0 comments on commit 62ecb71

Please sign in to comment.