From 207941d0f3ff9b9a1158e38d59856c877f91ecc5 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 11 May 2022 16:59:38 +0530 Subject: [PATCH] Add RemoteDirectoryFactory and use RemoteDirectory instance in RefreshListener Signed-off-by: Sachin Kale --- .../opensearch/index/shard/IndexShardIT.java | 3 +- .../org/opensearch/index/IndexModule.java | 4 ++ .../org/opensearch/index/IndexService.java | 28 +++++++- .../opensearch/index/shard/IndexShard.java | 14 +++- .../shard/RemoteStoreRefreshListener.java | 38 +++++++++++ .../index/store/RemoteDirectoryFactory.java | 32 +++++++++ .../opensearch/indices/IndicesService.java | 2 +- .../opensearch/plugins/IndexStorePlugin.java | 17 +++++ .../store/RemoteDirectoryFactoryTests.java | 65 +++++++++++++++++++ ...dicesLifecycleListenerSingleNodeTests.java | 2 +- .../index/shard/IndexShardTestCase.java | 3 +- 11 files changed, 200 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java create mode 100644 server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java create mode 100644 server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 5f014e89e330e..5e36a0f55e543 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -673,7 +673,8 @@ public static final IndexShard newIndexShard( Arrays.asList(listeners), () -> {}, RetentionLeaseSyncer.EMPTY, - cbs + cbs, + null ); } diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index acceee7562068..2036253e75823 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -70,6 +70,7 @@ import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; +import org.opensearch.index.store.RemoteDirectoryFactory; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -118,6 +119,8 @@ public final class IndexModule { private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory(); + private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory(); + private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new; public static final Setting INDEX_STORE_TYPE_SETTING = new Setting<>( @@ -511,6 +514,7 @@ public IndexService newIndexService( client, queryCache, directoryFactory, + REMOTE_DIRECTORY_FACTORY, eventListener, readerWrapperFactory, mapperRegistry, diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 54aa7f5097d61..49b205431d267 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -81,6 +81,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.shard.IndexingOperationListener; +import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardNotFoundException; @@ -95,6 +96,9 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.threadpool.ThreadPool; @@ -135,6 +139,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final NodeEnvironment nodeEnv; private final ShardStoreDeleter shardStoreDeleter; private final IndexStorePlugin.DirectoryFactory directoryFactory; + private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory; private final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory; private final CheckedFunction readerWrapper; private final IndexCache indexCache; @@ -189,6 +194,7 @@ public IndexService( Client client, QueryCache queryCache, IndexStorePlugin.DirectoryFactory directoryFactory, + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, IndexEventListener eventListener, Function> wrapperFactory, MapperRegistry mapperRegistry, @@ -259,6 +265,7 @@ public IndexService( this.eventListener = eventListener; this.nodeEnv = nodeEnv; this.directoryFactory = directoryFactory; + this.remoteDirectoryFactory = remoteDirectoryFactory; this.recoveryStateFactory = recoveryStateFactory; this.engineFactory = Objects.requireNonNull(engineFactory); this.engineConfigFactory = Objects.requireNonNull(engineConfigFactory); @@ -423,7 +430,8 @@ private long getAvgShardSizeInBytes() throws IOException { public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, - final RetentionLeaseSyncer retentionLeaseSyncer + final RetentionLeaseSyncer retentionLeaseSyncer, + final RepositoriesService repositoriesService ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -497,6 +505,21 @@ public synchronized IndexShard createShard( } }; Directory directory = directoryFactory.newDirectory(this.indexSettings, path); + Directory remoteDirectory = null; + RemoteStoreRefreshListener remoteStoreRefreshListener = null; + if (this.indexSettings.isRemoteStoreEnabled()) { + try { + Repository repository = repositoriesService.repository("dragon-stone"); + remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository); + remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory); + } catch (RepositoryMissingException e) { + throw new IllegalArgumentException( + "Repository should be created before creating index with remote_store enabled setting", + e + ); + } + } + store = new Store( shardId, this.indexSettings, @@ -525,7 +548,8 @@ public synchronized IndexShard createShard( indexingOperationListeners, () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, - circuitBreakerService + circuitBreakerService, + remoteStoreRefreshListener ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d8fa560a4a812..28178abc6158a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -299,6 +299,8 @@ Runnable getGlobalCheckpointSyncer() { private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; + private final RemoteStoreRefreshListener remoteStoreRefreshListener; + public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -319,7 +321,8 @@ public IndexShard( final List listeners, final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final CircuitBreakerService circuitBreakerService + final CircuitBreakerService circuitBreakerService, + final RemoteStoreRefreshListener remoteStoreRefreshListener ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -402,6 +405,7 @@ public boolean shouldCache(Query query) { persistMetadata(path, indexSettings, shardRouting, null, logger); this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases(); this.refreshPendingLocationListener = new RefreshPendingLocationListener(); + this.remoteStoreRefreshListener = remoteStoreRefreshListener; } public ThreadPool getThreadPool() { @@ -3100,6 +3104,12 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { } }; + final List internalRefreshListener; + if (remoteStoreRefreshListener != null && shardRouting.primary()) { + internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), remoteStoreRefreshListener); + } else { + internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); + } return this.engineConfigFactory.newEngineConfig( shardId, threadPool, @@ -3116,7 +3126,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Arrays.asList(refreshListeners, refreshPendingLocationListener), - Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), + internalRefreshListener, indexSort, circuitBreakerService, globalCheckpointSupplier, diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java new file mode 100644 index 0000000000000..c41fe0ba27177 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.store.Directory; + +import java.io.IOException; + +/** + * RefreshListener implementation to upload newly created segment files to the remote store + */ +public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { + + private final Directory storeDirectory; + private final Directory remoteDirectory; + + public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) { + this.storeDirectory = storeDirectory; + this.remoteDirectory = remoteDirectory; + } + + @Override + public void beforeRefresh() throws IOException { + // ToDo Add implementation + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + // ToDo Add implementation + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java new file mode 100644 index 0000000000000..0d3fdc27bbc30 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +import java.io.IOException; + +public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory { + + @Override + public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException { + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobPath blobPath = new BlobPath(); + blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId())); + BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath); + return new RemoteDirectory(blobContainer); + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 87cd63119f1d4..007093c58e777 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -852,7 +852,7 @@ public IndexShard createShard( IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, repositoriesService); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java index 2f549fec54759..52ddf6dcf2753 100644 --- a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java @@ -39,6 +39,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.repositories.Repository; import java.io.IOException; import java.util.Collections; @@ -66,6 +67,22 @@ interface DirectoryFactory { Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException; } + /** + * An interface that describes how to create a new remote directory instance per shard. + */ + @FunctionalInterface + interface RemoteDirectoryFactory { + /** + * Creates a new remote directory per shard. This method is called once per shard on shard creation. + * @param indexSettings the shards index settings + * @param shardPath the path the shard is using + * @param repository to get the BlobContainer details + * @return a new RemoteDirectory instance + * @throws IOException if an IOException occurs while opening the directory + */ + Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException; + } + /** * The {@link DirectoryFactory} mappings for this plugin. When an index is created the store type setting * {@link org.opensearch.index.IndexModule#INDEX_STORE_TYPE_SETTING} on the index will be examined and either use the default or a diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java new file mode 100644 index 0000000000000..d781fad9ab99c --- /dev/null +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.lucene.store.Directory; +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; + +public class RemoteDirectoryFactoryTests extends OpenSearchTestCase { + + private RemoteDirectoryFactory remoteDirectoryFactory; + + @Before + public void setup() { + remoteDirectoryFactory = new RemoteDirectoryFactory(); + } + + public void testNewDirectory() throws IOException { + Settings settings = Settings.builder().build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); + Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0"); + ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0)); + BlobStoreRepository repository = mock(BlobStoreRepository.class); + BlobStore blobStore = mock(BlobStore.class); + BlobContainer blobContainer = mock(BlobContainer.class); + when(repository.blobStore()).thenReturn(blobStore); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); + + Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository); + assertTrue(directory instanceof RemoteDirectory); + ArgumentCaptor blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); + verify(blobStore).blobContainer(blobPathCaptor.capture()); + BlobPath blobPath = blobPathCaptor.getValue(); + assertEquals("foo/0/", blobPath.buildAsString()); + + directory.listAll(); + verify(blobContainer).listBlobs(); + } +} diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 5f3d03f85f324..f4290affa136a 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -148,7 +148,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting = newRouting.moveToUnassigned(unassignedInfo) .updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); - IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY); + IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, null); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode( diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 509edfd1b9103..015b14c520721 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -477,7 +477,8 @@ protected IndexShard newShard( Arrays.asList(listeners), globalCheckpointSyncer, retentionLeaseSyncer, - breakerService + breakerService, + null ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true;