Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Use DirectoryFactory interface to create remote directory #6985

Merged
merged 1 commit into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ public IndexService newIndexService(
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Expand Down
10 changes: 3 additions & 7 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final NodeEnvironment nodeEnv;
private final ShardStoreDeleter shardStoreDeleter;
private final IndexStorePlugin.DirectoryFactory directoryFactory;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory;
private final CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper;
private final IndexCache indexCache;
Expand Down Expand Up @@ -194,7 +194,7 @@ public IndexService(
Client client,
QueryCache queryCache,
IndexStorePlugin.DirectoryFactory directoryFactory,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
IndexEventListener eventListener,
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> wrapperFactory,
MapperRegistry mapperRegistry,
Expand Down Expand Up @@ -470,11 +470,7 @@ public synchronized IndexShard createShard(

Store remoteStore = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(
this.indexSettings.getRemoteStoreRepository(),
this.indexSettings,
path
);
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*
* @opensearch.internal
*/
public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory {
public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.DirectoryFactory {

private final Supplier<RepositoriesService> repositoriesService;

Expand All @@ -36,7 +36,8 @@ public RemoteSegmentStoreDirectoryFactory(Supplier<RepositoriesService> reposito
}

@Override
public Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath path) throws IOException {
public Directory newDirectory(IndexSettings indexSettings, ShardPath path) throws IOException {
String repositoryName = indexSettings.getRemoteStoreRepository();
try (Repository repository = repositoriesService.get().repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath commonBlobPath = ((BlobStoreRepository) repository).basePath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
private final boolean nodeWriteDanglingIndicesInfo;
private final ValuesSourceRegistry valuesSourceRegistry;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

@Override
Expand Down Expand Up @@ -320,7 +320,7 @@ public IndicesService(
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.settings = settings;
Expand Down Expand Up @@ -435,7 +435,7 @@ public IndicesService(
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.settings = settings;
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ protected Node(
rerouteServiceReference.set(rerouteService);
clusterService.setRerouteService(rerouteService);

final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(
repositoriesServiceReference::get
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,6 @@ interface DirectoryFactory {
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException;
}

/**
* An interface that describes how to create a new remote directory instance per shard.
*/
@FunctionalInterface
interface RemoteDirectoryFactory {
/**
* Creates a new remote directory per shard. This method is called once per shard on shard creation.
* @param repositoryName repository name
* @param indexSettings the shards index settings
* @param shardPath the path the shard is using
* @return a new RemoteDirectory instance
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath shardPath) throws IOException;
}

/**
* The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting
* {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ public void setup() {
}

public void testNewDirectory() throws IOException {
Settings settings = Settings.builder().put(IndexMetadata.SETTING_INDEX_UUID, "uuid_1").build();
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_INDEX_UUID, "uuid_1")
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "remote_store_repository")
.build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));
Expand All @@ -66,7 +69,7 @@ public void testNewDirectory() throws IOException {

when(repositoriesService.repository("remote_store_repository")).thenReturn(repository);

try (Directory directory = remoteSegmentStoreDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath)) {
try (Directory directory = remoteSegmentStoreDirectoryFactory.newDirectory(indexSettings, shardPath)) {
assertTrue(directory instanceof RemoteSegmentStoreDirectory);
ArgumentCaptor<BlobPath> blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class);
verify(blobStore, times(2)).blobContainer(blobPathCaptor.capture());
Expand All @@ -80,17 +83,14 @@ public void testNewDirectory() throws IOException {
}

public void testNewDirectoryRepositoryDoesNotExist() {
Settings settings = Settings.builder().build();
Settings settings = Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "remote_store_repository").build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings);
Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0");
ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0));

when(repositoriesService.repository("remote_store_repository")).thenThrow(new RepositoryMissingException("Missing"));

assertThrows(
IllegalArgumentException.class,
() -> remoteSegmentStoreDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath)
);
assertThrows(IllegalArgumentException.class, () -> remoteSegmentStoreDirectoryFactory.newDirectory(indexSettings, shardPath));
}

}