diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 2cea0e4e3e95c..46c2b91b4b99a 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -119,8 +119,6 @@ public final class IndexModule { private static final FsDirectoryFactory DEFAULT_DIRECTORY_FACTORY = new FsDirectoryFactory(); - private static final RemoteDirectoryFactory REMOTE_DIRECTORY_FACTORY = new RemoteDirectoryFactory(); - private static final IndexStorePlugin.RecoveryStateFactory DEFAULT_RECOVERY_STATE_FACTORY = RecoveryState::new; public static final Setting INDEX_STORE_TYPE_SETTING = new Setting<>( @@ -189,9 +187,9 @@ public final class IndexModule { * Construct the index module for the index with the specified index settings. The index module contains extension points for plugins * via {@link org.opensearch.plugins.PluginsService#onIndexModule(IndexModule)}. * - * @param indexSettings the index settings - * @param analysisRegistry the analysis registry - * @param engineFactory the engine factory + * @param indexSettings the index settings + * @param analysisRegistry the analysis registry + * @param engineFactory the engine factory * @param directoryFactories the available store types */ public IndexModule( @@ -476,7 +474,8 @@ public IndexService newIndexService( IndicesFieldDataCache indicesFieldDataCache, NamedWriteableRegistry namedWriteableRegistry, BooleanSupplier idFieldDataEnabled, - ValuesSourceRegistry valuesSourceRegistry + ValuesSourceRegistry valuesSourceRegistry, + RemoteDirectoryFactory remoteDirectoryFactory ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -519,7 +518,7 @@ public IndexService newIndexService( client, queryCache, directoryFactory, - REMOTE_DIRECTORY_FACTORY, + remoteDirectoryFactory, eventListener, readerWrapperFactory, mapperRegistry, diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index f699278919d6b..9109bd533102f 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -81,7 +81,6 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.shard.IndexingOperationListener; -import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardNotFoundException; @@ -97,9 +96,6 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.Repository; -import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.threadpool.ThreadPool; @@ -437,8 +433,7 @@ public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher, - final RepositoriesService repositoriesService + final SegmentReplicationCheckpointPublisher checkpointPublisher ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -511,22 +506,24 @@ public synchronized IndexShard createShard( warmer.warm(reader, shard, IndexService.this.indexSettings); } }; - Directory directory = directoryFactory.newDirectory(this.indexSettings, path); - Directory remoteDirectory = null; - RemoteStoreRefreshListener remoteStoreRefreshListener = null; + + Store remoteStore = null; if (this.indexSettings.isRemoteStoreEnabled()) { - try { - Repository repository = repositoriesService.repository(clusterService.state().metadata().clusterUUID()); - remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repository); - remoteStoreRefreshListener = new RemoteStoreRefreshListener(directory, remoteDirectory); - } catch (RepositoryMissingException e) { - throw new IllegalArgumentException( - "Repository should be created before creating index with remote_store enabled setting", - e - ); - } + Directory remoteDirectory = remoteDirectoryFactory.newDirectory( + clusterService.state().metadata().clusterUUID(), + this.indexSettings, + path + ); + remoteStore = new Store( + shardId, + this.indexSettings, + remoteDirectory, + lock, + new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)) + ); } + Directory directory = directoryFactory.newDirectory(this.indexSettings, path); store = new Store( shardId, this.indexSettings, @@ -557,7 +554,7 @@ public synchronized IndexShard createShard( retentionLeaseSyncer, circuitBreakerService, this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null, - remoteStoreRefreshListener + remoteStore ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index dff5fcdba4239..07c74849e0d55 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -48,6 +48,8 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.ThreadInterruptedException; import org.opensearch.Assertions; @@ -305,7 +307,7 @@ Runnable getGlobalCheckpointSyncer() { private volatile boolean useRetentionLeasesInPeerRecovery; private final ReferenceManager.RefreshListener checkpointRefreshListener; - private final RemoteStoreRefreshListener remoteStoreRefreshListener; + private final Store remoteStore; public IndexShard( final ShardRouting shardRouting, @@ -329,7 +331,7 @@ public IndexShard( final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, - @Nullable final RemoteStoreRefreshListener remoteStoreRefreshListener + @Nullable final Store remoteStore ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -417,7 +419,7 @@ public boolean shouldCache(Query query) { } else { this.checkpointRefreshListener = null; } - this.remoteStoreRefreshListener = remoteStoreRefreshListener; + this.remoteStore = remoteStore; } public ThreadPool getThreadPool() { @@ -428,6 +430,10 @@ public Store store() { return this.store; } + public Store remoteStore() { + return this.remoteStore; + } + /** * Return the sort order of this index, or null if the index has no sort. */ @@ -1638,7 +1644,8 @@ public void close(String reason, boolean flushEngine) throws IOException { } finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions); + // Closing remoteStore as a part of IndexShard close. null check is handled by IOUtils + IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions, remoteStore); indexShardOperationPermits.close(); } } @@ -3192,7 +3199,7 @@ private DocumentMapperForType docMapper() { return mapperService.documentMapperWithAutoCreate(); } - private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { + private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) throws IOException { final Sort indexSort = indexSortSupplier.get(); final Engine.Warmer warmer = reader -> { assert Thread.holdsLock(mutex) == false : "warming engine under mutex"; @@ -3204,8 +3211,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { final List internalRefreshListener = new ArrayList<>(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); - if (remoteStoreRefreshListener != null && shardRouting.primary()) { - internalRefreshListener.add(remoteStoreRefreshListener); + if (isRemoteStoreEnabled()) { + Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate(); + internalRefreshListener.add(new RemoteStoreRefreshListener(store.directory(), remoteDirectory)); } if (this.checkpointRefreshListener != null) { internalRefreshListener.add(checkpointRefreshListener); @@ -3238,6 +3246,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { ); } + private boolean isRemoteStoreEnabled() { + return (remoteStore != null && shardRouting.primary()); + } + /** * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 2f8f977537327..855457f275122 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -94,11 +94,12 @@ public IndexInput openInput(String name, IOContext context) throws IOException { } /** - * Closes the directory by deleting all the files in this directory + * Closes the remote directory. Currently, it is a no-op. + * If remote directory maintains a state in future, we need to clean it before closing the directory */ @Override public void close() throws IOException { - blobContainer.delete(); + // Do nothing } /** diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java index eb7912a1f4a2b..62f398cdad207 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectoryFactory.java @@ -14,10 +14,13 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; +import java.util.function.Supplier; /** * Factory for a remote store directory @@ -26,12 +29,23 @@ */ public class RemoteDirectoryFactory implements IndexStorePlugin.RemoteDirectoryFactory { + private final Supplier repositoriesService; + + public RemoteDirectoryFactory(Supplier repositoriesService) { + this.repositoriesService = repositoriesService; + } + @Override - public Directory newDirectory(IndexSettings indexSettings, ShardPath path, Repository repository) throws IOException { - assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; - BlobPath blobPath = new BlobPath(); - blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId())); - BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath); - return new RemoteDirectory(blobContainer); + public Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath path) throws IOException { + try (Repository repository = repositoriesService.get().repository(repositoryName)) { + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobPath blobPath = new BlobPath(); + blobPath = blobPath.add(indexSettings.getIndex().getName()).add(String.valueOf(path.getShardId().getId())); + BlobContainer blobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(blobPath); + return new RemoteDirectory(blobContainer); + } catch (RepositoryMissingException e) { + throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); + } } + } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b2f6f10c19638..fdb609ba7bbff 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -132,6 +132,7 @@ import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.IndexingStats; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.RemoteDirectoryFactory; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -265,6 +266,7 @@ public class IndicesService extends AbstractLifecycleComponent private final Set danglingIndicesToWrite = Sets.newConcurrentHashSet(); private final boolean nodeWriteDanglingIndicesInfo; private final ValuesSourceRegistry valuesSourceRegistry; + private final RemoteDirectoryFactory remoteDirectoryFactory; @Override protected void doStart() { @@ -292,7 +294,8 @@ public IndicesService( Collection>> engineFactoryProviders, Map directoryFactories, ValuesSourceRegistry valuesSourceRegistry, - Map recoveryStateFactories + Map recoveryStateFactories, + RemoteDirectoryFactory remoteDirectoryFactory ) { this.settings = settings; this.threadPool = threadPool; @@ -386,6 +389,7 @@ protected void closeInternal() { this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings()); clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); + this.remoteDirectoryFactory = remoteDirectoryFactory; } private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask"; @@ -745,7 +749,8 @@ private synchronized IndexService createIndexService( indicesFieldDataCache, namedWriteableRegistry, this::isIdFieldDataEnabled, - valuesSourceRegistry + valuesSourceRegistry, + remoteDirectoryFactory ); } @@ -859,13 +864,7 @@ public IndexShard createShard( IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard( - shardRouting, - globalCheckpointSyncer, - retentionLeaseSyncer, - checkpointPublisher, - repositoriesService - ); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 4b4fdc974f8cb..4e0fb39db80d2 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -41,6 +41,7 @@ import org.opensearch.indices.replication.SegmentReplicationSourceFactory; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.SegmentReplicationSourceService; +import org.opensearch.index.store.RemoteDirectoryFactory; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; import org.opensearch.Build; @@ -622,6 +623,8 @@ protected Node( rerouteServiceReference.set(rerouteService); clusterService.setRerouteService(rerouteService); + final RemoteDirectoryFactory remoteDirectoryFactory = new RemoteDirectoryFactory(repositoriesServiceReference::get); + final IndicesService indicesService = new IndicesService( settings, pluginsService, @@ -642,7 +645,8 @@ protected Node( engineFactoryProviders, indexStoreFactories, searchModule.getValuesSourceRegistry(), - recoveryStateFactories + recoveryStateFactories, + remoteDirectoryFactory ); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java index 52ddf6dcf2753..1dc90a21c2f70 100644 --- a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java @@ -39,7 +39,6 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.repositories.Repository; import java.io.IOException; import java.util.Collections; @@ -74,13 +73,13 @@ interface DirectoryFactory { interface RemoteDirectoryFactory { /** * Creates a new remote directory per shard. This method is called once per shard on shard creation. + * @param repositoryName repository name * @param indexSettings the shards index settings * @param shardPath the path the shard is using - * @param repository to get the BlobContainer details * @return a new RemoteDirectory instance * @throws IOException if an IOException occurs while opening the directory */ - Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, Repository repository) throws IOException; + Directory newDirectory(String repositoryName, IndexSettings indexSettings, ShardPath shardPath) throws IOException; } /** diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index daa9186dfd8c0..45d93a5a12847 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -54,6 +54,7 @@ import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedFunction; +import org.opensearch.common.UUIDs; import org.opensearch.common.breaker.CircuitBreaker; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -88,6 +89,7 @@ import org.opensearch.index.similarity.NonNegativeScoresSimilarity; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; +import org.opensearch.index.store.RemoteDirectoryFactory; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.analysis.AnalysisModule; @@ -98,6 +100,7 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; import org.opensearch.search.internal.ReaderContext; import org.opensearch.test.ClusterServiceUtils; @@ -107,6 +110,8 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.hamcrest.Matchers; +import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportService; import java.io.IOException; import java.util.Collections; @@ -151,6 +156,7 @@ public void addPendingDelete(ShardId shardId, IndexSettings indexSettings) {} private BigArrays bigArrays; private ScriptService scriptService; private ClusterService clusterService; + private RepositoriesService repositoriesService; @Override public void setUp() throws Exception { @@ -183,6 +189,24 @@ public void setUp() throws Exception { clusterService = ClusterServiceUtils.createClusterService(threadPool); nodeEnvironment = new NodeEnvironment(settings, environment); mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); + TransportService transportService = new TransportService( + settings, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), + null, + Collections.emptySet() + ); + repositoriesService = new RepositoriesService( + settings, + clusterService, + transportService, + Collections.emptyMap(), + Collections.emptyMap(), + threadPool + ); + } @Override @@ -209,7 +233,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { new IndicesFieldDataCache(settings, listener), writableRegistry(), () -> false, - null + null, + new RemoteDirectoryFactory(() -> repositoriesService) ); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java index d781fad9ab99c..e8357d2c184bf 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryFactoryTests.java @@ -18,6 +18,8 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; @@ -25,6 +27,7 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Collections; +import java.util.function.Supplier; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -33,11 +36,16 @@ public class RemoteDirectoryFactoryTests extends OpenSearchTestCase { + private Supplier repositoriesServiceSupplier; + private RepositoriesService repositoriesService; private RemoteDirectoryFactory remoteDirectoryFactory; @Before public void setup() { - remoteDirectoryFactory = new RemoteDirectoryFactory(); + repositoriesServiceSupplier = mock(Supplier.class); + repositoriesService = mock(RepositoriesService.class); + when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); + remoteDirectoryFactory = new RemoteDirectoryFactory(repositoriesServiceSupplier); } public void testNewDirectory() throws IOException { @@ -52,14 +60,33 @@ public void testNewDirectory() throws IOException { when(blobStore.blobContainer(any())).thenReturn(blobContainer); when(blobContainer.listBlobs()).thenReturn(Collections.emptyMap()); - Directory directory = remoteDirectoryFactory.newDirectory(indexSettings, shardPath, repository); - assertTrue(directory instanceof RemoteDirectory); - ArgumentCaptor blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); - verify(blobStore).blobContainer(blobPathCaptor.capture()); - BlobPath blobPath = blobPathCaptor.getValue(); - assertEquals("foo/0/", blobPath.buildAsString()); + when(repositoriesService.repository("remote_store_repository")).thenReturn(repository); - directory.listAll(); - verify(blobContainer).listBlobs(); + try (Directory directory = remoteDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath)) { + assertTrue(directory instanceof RemoteDirectory); + ArgumentCaptor blobPathCaptor = ArgumentCaptor.forClass(BlobPath.class); + verify(blobStore).blobContainer(blobPathCaptor.capture()); + BlobPath blobPath = blobPathCaptor.getValue(); + assertEquals("foo/0/", blobPath.buildAsString()); + + directory.listAll(); + verify(blobContainer).listBlobs(); + verify(repositoriesService).repository("remote_store_repository"); + } + } + + public void testNewDirectoryRepositoryDoesNotExist() { + Settings settings = Settings.builder().build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); + Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0"); + ShardPath shardPath = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0)); + + when(repositoriesService.repository("remote_store_repository")).thenThrow(new RepositoryMissingException("Missing")); + + assertThrows( + IllegalArgumentException.class, + () -> remoteDirectoryFactory.newDirectory("remote_store_repository", indexSettings, shardPath) + ); } + } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index c2c365d9140df..76d4d50022042 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -109,18 +109,6 @@ public void testOpenInputNoSuchFileException() throws IOException { assertThrows(NoSuchFileException.class, () -> remoteDirectory.openInput("segment_1", IOContext.DEFAULT)); } - public void testClose() throws IOException { - remoteDirectory.close(); - - verify(blobContainer).delete(); - } - - public void testCloseIOException() throws IOException { - when(blobContainer.delete()).thenThrow(new IOException("Error while writing to blob store")); - - assertThrows(IOException.class, () -> remoteDirectory.close()); - } - public void testFileLength() throws IOException { Map fileInfo = new HashMap<>(); fileInfo.put("segment_1", new PlainBlobMetadata("segment_1", 100)); diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 213a22539971f..0989bf869f18e 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -153,8 +153,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY, - null + SegmentReplicationCheckpointPublisher.EMPTY ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 68a6af25a7c82..9558e898f8832 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -171,6 +171,7 @@ import org.opensearch.index.seqno.GlobalCheckpointSyncAction; import org.opensearch.index.seqno.RetentionLeaseSyncer; import org.opensearch.index.shard.PrimaryReplicaSyncer; +import org.opensearch.index.store.RemoteDirectoryFactory; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.IndicesService; import org.opensearch.indices.ShardLimitValidator; @@ -1818,7 +1819,8 @@ public void onFailure(final Exception e) { Collections.emptyList(), emptyMap(), null, - emptyMap() + emptyMap(), + new RemoteDirectoryFactory(() -> repositoriesService) ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService(