From 386a26e3084ba655720f9f0bc8033da84c6dfb47 Mon Sep 17 00:00:00 2001 From: Dharmesh Date: Mon, 3 Jul 2023 14:22:31 +0530 Subject: [PATCH] [Remote Store] Moving stale commit deletion to async flow (#8201) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --------- Signed-off-by: Dharmesh 💤 Signed-off-by: sahil buddharaju --- .../RemoteStoreBaseIntegTestCase.java | 8 ++ .../opensearch/remotestore/RemoteStoreIT.java | 39 ++++++++++ .../opensearch/index/shard/IndexShard.java | 2 +- .../shard/RemoteStoreRefreshListener.java | 13 +--- .../opensearch/index/shard/StoreRecovery.java | 9 ++- .../store/RemoteSegmentStoreDirectory.java | 44 ++++++++++- .../RemoteSegmentStoreDirectoryFactory.java | 8 +- .../main/java/org/opensearch/node/Node.java | 3 +- .../opensearch/index/IndexModuleTests.java | 2 +- ...moteSegmentStoreDirectoryFactoryTests.java | 5 +- .../RemoteSegmentStoreDirectoryTests.java | 73 ++++++++++++++++--- .../snapshots/SnapshotResiliencyTests.java | 2 +- .../index/shard/IndexShardTestCase.java | 2 +- 13 files changed, 175 insertions(+), 35 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index d226d0d757638..336646b35b5a6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -15,6 +15,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; +import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; @@ -74,6 +75,13 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) { return remoteStoreIndexSettings(numberOfReplicas, 1); } + protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) { + return Settings.builder() + .put(remoteStoreIndexSettings(numberOfReplicas)) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit) + .build(); + } + protected Settings remoteTranslogIndexSettings(int numberOfReplicas, int numberOfShards) { return Settings.builder() .put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 70a41d74a57c5..f6ba8cfed00d0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -17,6 +17,7 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalTestCluster; @@ -277,4 +278,42 @@ public void testRemoteSegmentCleanup() throws Exception { public void testRemoteTranslogCleanup() throws Exception { verifyRemoteStoreCleanup(true); } + + public void testStaleCommitDeletionWithInvokeFlush() throws Exception { + internalCluster().startDataOnlyNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, true); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); + // Delete is async. + assertBusy(() -> { + int actualFileCount = getFileCount(indexPath); + if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) { + assertEquals(numberOfIterations, actualFileCount); + } else { + // As delete is async its possible that the file gets created before the deletion or after + // deletion. + assertTrue(actualFileCount >= 10 || actualFileCount <= 11); + } + }, 30, TimeUnit.SECONDS); + } + + public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { + internalCluster().startDataOnlyNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, false); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); + assertEquals(1, getFileCount(indexPath)); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d89d51c713d70..01c0a12d463ea 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2632,7 +2632,7 @@ public void restoreFromSnapshotAndRemoteStore( assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + recoveryState.getRecoverySource(); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener); + storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 7cfaaafcadd39..ddca12d9283f3 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -85,7 +85,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres // Visible for testing static final Set EXCLUDE_FILES = Set.of("write.lock"); // Visible for testing - static final int LAST_N_METADATA_FILES_TO_KEEP = 10; + public static final int LAST_N_METADATA_FILES_TO_KEEP = 10; private final IndexShard indexShard; private final Directory storeDirectory; @@ -200,9 +200,8 @@ private synchronized void syncSegments(boolean isRetry) { // if a new segments_N file is present in local that is not uploaded to remote store yet, it // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. // This is done to avoid delete post each refresh. - // Ideally, we want this to be done in async flow. (GitHub issue #4315) if (isRefreshAfterCommit()) { - deleteStaleCommits(); + remoteDirectory.deleteStaleSegmentsAsync(LAST_N_METADATA_FILES_TO_KEEP); } try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { @@ -381,14 +380,6 @@ private String getChecksumOfLocalFile(String file) throws IOException { return localSegmentChecksumMap.get(file); } - private void deleteStaleCommits() { - try { - remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP); - } catch (IOException e) { - logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); - } - } - /** * Updates the last refresh time and refresh seq no which is seen by local store. */ diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 119524e8caf8a..da4e9113143af 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -69,6 +69,7 @@ import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.channels.FileChannel; @@ -356,7 +357,8 @@ void recoverFromSnapshotAndRemoteStore( final IndexShard indexShard, Repository repository, RepositoriesService repositoriesService, - ActionListener listener + ActionListener listener, + ThreadPool threadPool ) { try { if (canRecover(indexShard)) { @@ -384,7 +386,10 @@ void recoverFromSnapshotAndRemoteStore( remoteStoreRepository = shallowCopyShardMetadata.getRemoteStoreRepository(); } - RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService); + RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory( + () -> repositoriesService, + threadPool + ); RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory( remoteStoreRepository, indexUUID, diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index addd8a24af9c5..ac129aca8baf7 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -27,6 +27,7 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.threadpool.ThreadPool; import java.io.FileNotFoundException; import java.io.IOException; @@ -75,6 +76,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final RemoteStoreLockManager mdLockManager; + private final ThreadPool threadPool; + /** * To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation * This is achieved by uploading refresh metadata file with the same UUID suffix. @@ -96,15 +99,23 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class); + /** + * AtomicBoolean that ensures only one staleCommitDeletion activity is scheduled at a time. + * Visible for testing + */ + protected final AtomicBoolean canDeleteStaleCommits = new AtomicBoolean(true); + public RemoteSegmentStoreDirectory( RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory, - RemoteStoreLockManager mdLockManager + RemoteStoreLockManager mdLockManager, + ThreadPool threadPool ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; + this.threadPool = threadPool; init(); } @@ -574,7 +585,7 @@ public Map getSegmentsUploadedToRemoteStore(lon * @param lastNMetadataFilesToKeep number of metadata files to keep * @throws IOException in case of I/O error while reading from / writing to remote segment store */ - public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { + private void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); List sortedMetadataFileList = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList()); if (sortedMetadataFileList.size() <= lastNMetadataFilesToKeep) { @@ -656,6 +667,33 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException } } + /** + * Delete stale segment and metadata files asynchronously. + * This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner. + * @param lastNMetadataFilesToKeep number of metadata files to keep + */ + public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) { + if (canDeleteStaleCommits.compareAndSet(true, false)) { + try { + threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + try { + deleteStaleSegments(lastNMetadataFilesToKeep); + } catch (Exception e) { + logger.info( + "Exception while deleting stale commits from remote segment store, will retry delete post next commit", + e + ); + } finally { + canDeleteStaleCommits.set(true); + } + }); + } catch (Exception e) { + logger.info("Exception occurred while scheduling deleteStaleCommits", e); + canDeleteStaleCommits.set(true); + } + } + } + /* Tries to delete shard level directory if it is empty Return true if it deleted it successfully @@ -680,7 +718,7 @@ private boolean deleteIfEmpty() throws IOException { } public void close() throws IOException { - deleteStaleSegments(0); + deleteStaleSegmentsAsync(0); deleteIfEmpty(); } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 03995d5913fb3..3bec84f287ce4 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -20,6 +20,7 @@ import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.function.Supplier; @@ -34,8 +35,11 @@ public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.Dire private final Supplier repositoriesService; - public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService) { + private final ThreadPool threadPool; + + public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService, ThreadPool threadPool) { this.repositoriesService = repositoriesService; + this.threadPool = threadPool; } @Override @@ -62,7 +66,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh shardId ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index c93d8f0af498a..7d2b14fff4752 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -719,7 +719,8 @@ protected Node( clusterService.setRerouteService(rerouteService); final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( - repositoriesServiceReference::get + repositoriesServiceReference::get, + threadPool ); final CoordinatorStats coordinatorStats = new CoordinatorStats(); diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index d9d87196ca289..32b8fb5a4dc62 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -252,7 +252,7 @@ private IndexService newIndexService(IndexModule module) throws IOException { writableRegistry(), () -> false, null, - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), translogFactorySupplier ); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 7a9cbc12d823b..324315505987b 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -24,6 +24,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; @@ -41,14 +42,16 @@ public class RemoteSegmentStoreDirectoryFactoryTests extends OpenSearchTestCase private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; + private ThreadPool threadPool; private RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory; @Before public void setup() { repositoriesServiceSupplier = mock(Supplier.class); repositoriesService = mock(RepositoriesService.class); + threadPool = mock(ThreadPool.class); when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier); + remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier, threadPool); } public void testNewDirectory() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 3417e7b0aee04..66e4b9a357b85 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -30,12 +30,14 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.NoSuchFileException; @@ -45,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -56,6 +59,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.doReturn; +import static org.hamcrest.CoreMatchers.is; public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private RemoteDirectory remoteDataDirectory; @@ -65,21 +69,31 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; private IndexShard indexShard; private SegmentInfos segmentInfos; + private ThreadPool threadPool; @Before public void setup() throws IOException { remoteDataDirectory = mock(RemoteDirectory.class); remoteMetadataDirectory = mock(RemoteDirectory.class); mdLockManager = mock(RemoteStoreMetadataLockManager.class); + threadPool = mock(ThreadPool.class); - remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory, mdLockManager); + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + mdLockManager, + threadPool + ); Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build(); + ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); try (Store store = indexShard.store()) { segmentInfos = store.readLastCommittedSegmentsInfo(); } + + when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); } @After @@ -766,41 +780,76 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException { assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init()); } - public void testDeleteStaleCommitsException() throws IOException { + public void testDeleteStaleCommitsException() throws Exception { + populateMetadata(); when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow( new IOException("Error reading") ); - assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteStaleSegments(5)); + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); + } + + public void testDeleteStaleCommitsExceptionWhileScheduling() throws Exception { + populateMetadata(); + doThrow(new IllegalArgumentException()).when(threadPool).executor(any(String.class)); + + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); + } + + public void testDeleteStaleCommitsWithDeletionAlreadyInProgress() throws Exception { + populateMetadata(); + remoteSegmentStoreDirectory.canDeleteStaleCommits.set(false); + + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(false))); + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); } - public void testDeleteStaleCommitsWithinThreshold() throws IOException { + public void testDeleteStaleCommitsWithinThreshold() throws Exception { populateMetadata(); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=5 here so that none of the metadata files will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(5); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5); + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).openInput(any(String.class), eq(IOContext.DEFAULT)); } - public void testDeleteStaleCommitsActualDelete() throws IOException { + public void testDeleteStaleCommitsActualDelete() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); } - public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { + public void testDeleteStaleCommitsActualDeleteIOException() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -813,17 +862,18 @@ public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { doThrow(new IOException("Error")).when(remoteDataDirectory).deleteFile(segmentFileWithException); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).deleteFile("metadata__1__5__abc"); } - public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOException { + public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -836,13 +886,14 @@ public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOExc doThrow(new NoSuchFileException(segmentFileWithException)).when(remoteDataDirectory).deleteFile(segmentFileWithException); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 6cd8a16aeed9f..bef9a0ccbeb89 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1835,7 +1835,7 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, fileCacheCleaner, null diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ea9e9342673db..7f3819563dcbd 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -666,7 +666,7 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException {