From a219a71d39d92eaa96c4bda9369601e9a13f2dbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dharmesh=20=F0=9F=92=A4?= Date: Thu, 22 Jun 2023 01:43:02 +0530 Subject: [PATCH 1/8] [Remote Store] Moving stale commit deletion to async flow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dharmesh 💤 --- .../RemoteStoreBaseIntegTestCase.java | 2 ++ .../opensearch/remotestore/RemoteStoreIT.java | 27 +++++++++++++++++++ .../shard/RemoteStoreRefreshListener.java | 26 +++++++++++------- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index d226d0d757638..121b866df415e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -15,6 +15,7 @@ import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; +import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; @@ -59,6 +60,7 @@ private Settings defaultIndexSettings() { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE) .build(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 70a41d74a57c5..1f2b03d893344 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -17,6 +17,7 @@ import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalTestCluster; @@ -277,4 +278,30 @@ public void testRemoteSegmentCleanup() throws Exception { public void testRemoteTranslogCleanup() throws Exception { verifyRemoteStoreCleanup(true); } + + public void testStaleCommitDeletion() throws Exception { + internalCluster().startDataOnlyNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1)); + int numberOfIterations = randomIntBetween(5, 15); + boolean invokeFlush = randomBoolean(); + indexData(numberOfIterations, invokeFlush); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); + if (invokeFlush) { + assertBusy(() -> { + try { + int expectedFileCount = numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP + ? numberOfIterations + : 11; + assertEquals(expectedFileCount, getFileCount(indexPath)); + } catch (Exception e) {} + }, 30, TimeUnit.SECONDS); + } else { + assertEquals(1, getFileCount(indexPath)); + } + } } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 7cfaaafcadd39..7d0c0cf5669fd 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -85,7 +85,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres // Visible for testing static final Set EXCLUDE_FILES = Set.of("write.lock"); // Visible for testing - static final int LAST_N_METADATA_FILES_TO_KEEP = 10; + public static final int LAST_N_METADATA_FILES_TO_KEEP = 10; private final IndexShard indexShard; private final Directory storeDirectory; @@ -93,6 +93,10 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final RemoteRefreshSegmentTracker segmentTracker; private final Map localSegmentChecksumMap; private long primaryTerm; + /** + * Semaphore that ensures only one staleCommitDeletion activity is scheduled at a time. + */ + private final Semaphore staleCommitDeletionPermits = new Semaphore(1); /** * Semaphore that ensures there is only 1 retry scheduled at any time. @@ -200,9 +204,8 @@ private synchronized void syncSegments(boolean isRetry) { // if a new segments_N file is present in local that is not uploaded to remote store yet, it // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. // This is done to avoid delete post each refresh. - // Ideally, we want this to be done in async flow. (GitHub issue #4315) - if (isRefreshAfterCommit()) { - deleteStaleCommits(); + if (isRefreshAfterCommit() && staleCommitDeletionPermits.tryAcquire() == true) { + deleteStaleCommitsAsync(ThreadPool.Names.REMOTE_PURGE, staleCommitDeletionPermits::release); } try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { @@ -381,12 +384,15 @@ private String getChecksumOfLocalFile(String file) throws IOException { return localSegmentChecksumMap.get(file); } - private void deleteStaleCommits() { - try { - remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP); - } catch (IOException e) { - logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); - } + private void deleteStaleCommitsAsync(String threadpoolName, Runnable onCompletion) { + indexShard.getThreadPool().executor(threadpoolName).execute(() -> { + try { + remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP); + } catch (IOException e) { + logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); + } + onCompletion.run(); + }); } /** From d85c97ed32b3f94724ffe1752578d17985da2177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dharmesh=20=F0=9F=92=A4?= Date: Thu, 22 Jun 2023 22:32:47 +0530 Subject: [PATCH 2/8] Addressing Comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dharmesh 💤 --- .../RemoteStoreBaseIntegTestCase.java | 8 +++++++- .../opensearch/remotestore/RemoteStoreIT.java | 15 ++++++++++----- .../shard/RemoteStoreRefreshListener.java | 19 ++++++++++++------- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 121b866df415e..336646b35b5a6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -60,7 +60,6 @@ private Settings defaultIndexSettings() { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE) .build(); } @@ -76,6 +75,13 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) { return remoteStoreIndexSettings(numberOfReplicas, 1); } + protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) { + return Settings.builder() + .put(remoteStoreIndexSettings(numberOfReplicas)) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit) + .build(); + } + protected Settings remoteTranslogIndexSettings(int numberOfReplicas, int numberOfShards) { return Settings.builder() .put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 1f2b03d893344..7fad855341837 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -281,7 +281,7 @@ public void testRemoteTranslogCleanup() throws Exception { public void testStaleCommitDeletion() throws Exception { internalCluster().startDataOnlyNodes(3); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1)); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000)); int numberOfIterations = randomIntBetween(5, 15); boolean invokeFlush = randomBoolean(); indexData(numberOfIterations, invokeFlush); @@ -292,12 +292,17 @@ public void testStaleCommitDeletion() throws Exception { .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); if (invokeFlush) { + // Delete is async. assertBusy(() -> { try { - int expectedFileCount = numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP - ? numberOfIterations - : 11; - assertEquals(expectedFileCount, getFileCount(indexPath)); + int actualFileCount = getFileCount(indexPath); + if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) { + assertEquals(numberOfIterations, actualFileCount); + } else { + // As delete is async its possible that the file gets created before the deletion or after + // deletion. + assertTrue(actualFileCount >= 10 || actualFileCount <= 11); + } } catch (Exception e) {} }, 30, TimeUnit.SECONDS); } else { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 7d0c0cf5669fd..a2aacba944523 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -385,14 +385,19 @@ private String getChecksumOfLocalFile(String file) throws IOException { } private void deleteStaleCommitsAsync(String threadpoolName, Runnable onCompletion) { - indexShard.getThreadPool().executor(threadpoolName).execute(() -> { - try { - remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP); - } catch (IOException e) { - logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); - } + try { + indexShard.getThreadPool().executor(threadpoolName).execute(() -> { + try { + remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP); + } catch (IOException e) { + logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); + } + onCompletion.run(); + }); + } catch (Exception e) { + logger.info("Exception occurred while scheduling deleteStaleCommits", e); onCompletion.run(); - }); + } } /** From 73ae2876215d772c9f02e822ade5f6f55ebbac7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dharmesh=20=F0=9F=92=A4?= Date: Mon, 26 Jun 2023 12:57:04 +0530 Subject: [PATCH 3/8] Seperating IT for delete stale commit with and without flush MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dharmesh 💤 --- .../opensearch/remotestore/RemoteStoreIT.java | 49 +++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 7fad855341837..c19dbb218dd6c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -279,34 +279,43 @@ public void testRemoteTranslogCleanup() throws Exception { verifyRemoteStoreCleanup(true); } - public void testStaleCommitDeletion() throws Exception { + public void testStaleCommitDeletionWithInvokeFlush() throws Exception { internalCluster().startDataOnlyNodes(3); createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000)); int numberOfIterations = randomIntBetween(5, 15); - boolean invokeFlush = randomBoolean(); - indexData(numberOfIterations, invokeFlush); + indexData(numberOfIterations, true); String indexUUID = client().admin() .indices() .prepareGetSettings(INDEX_NAME) .get() .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); - if (invokeFlush) { - // Delete is async. - assertBusy(() -> { - try { - int actualFileCount = getFileCount(indexPath); - if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) { - assertEquals(numberOfIterations, actualFileCount); - } else { - // As delete is async its possible that the file gets created before the deletion or after - // deletion. - assertTrue(actualFileCount >= 10 || actualFileCount <= 11); - } - } catch (Exception e) {} - }, 30, TimeUnit.SECONDS); - } else { - assertEquals(1, getFileCount(indexPath)); - } + // Delete is async. + assertBusy(() -> { + try { + int actualFileCount = getFileCount(indexPath); + if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) { + assertEquals(numberOfIterations, actualFileCount); + } else { + // As delete is async its possible that the file gets created before the deletion or after + // deletion. + assertTrue(actualFileCount >= 10 || actualFileCount <= 11); + } + } catch (Exception e) {} + }, 30, TimeUnit.SECONDS); + } + + public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { + internalCluster().startDataOnlyNodes(3); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, false); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); + assertEquals(1, getFileCount(indexPath)); } } From b8c6e8a270e484dcb45a377f0e29ba65197e18ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dharmesh=20=F0=9F=92=A4?= Date: Mon, 26 Jun 2023 19:23:08 +0530 Subject: [PATCH 4/8] Changes to make all the deleteStaleSegments call async MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dharmesh 💤 --- .../shard/RemoteStoreRefreshListener.java | 24 +---------- .../store/RemoteSegmentStoreDirectory.java | 43 +++++++++++++++++-- .../RemoteSegmentStoreDirectoryFactory.java | 8 +++- .../main/java/org/opensearch/node/Node.java | 3 +- .../opensearch/index/IndexModuleTests.java | 2 +- ...moteSegmentStoreDirectoryFactoryTests.java | 5 ++- .../RemoteSegmentStoreDirectoryTests.java | 20 ++++++--- .../snapshots/SnapshotResiliencyTests.java | 2 +- .../index/shard/IndexShardTestCase.java | 2 +- 9 files changed, 71 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index a2aacba944523..ddca12d9283f3 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -93,10 +93,6 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final RemoteRefreshSegmentTracker segmentTracker; private final Map localSegmentChecksumMap; private long primaryTerm; - /** - * Semaphore that ensures only one staleCommitDeletion activity is scheduled at a time. - */ - private final Semaphore staleCommitDeletionPermits = new Semaphore(1); /** * Semaphore that ensures there is only 1 retry scheduled at any time. @@ -204,8 +200,8 @@ private synchronized void syncSegments(boolean isRetry) { // if a new segments_N file is present in local that is not uploaded to remote store yet, it // is considered as a first refresh post commit. A cleanup of stale commit files is triggered. // This is done to avoid delete post each refresh. - if (isRefreshAfterCommit() && staleCommitDeletionPermits.tryAcquire() == true) { - deleteStaleCommitsAsync(ThreadPool.Names.REMOTE_PURGE, staleCommitDeletionPermits::release); + if (isRefreshAfterCommit()) { + remoteDirectory.deleteStaleSegmentsAsync(LAST_N_METADATA_FILES_TO_KEEP); } try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { @@ -384,22 +380,6 @@ private String getChecksumOfLocalFile(String file) throws IOException { return localSegmentChecksumMap.get(file); } - private void deleteStaleCommitsAsync(String threadpoolName, Runnable onCompletion) { - try { - indexShard.getThreadPool().executor(threadpoolName).execute(() -> { - try { - remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP); - } catch (IOException e) { - logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); - } - onCompletion.run(); - }); - } catch (Exception e) { - logger.info("Exception occurred while scheduling deleteStaleCommits", e); - onCompletion.run(); - } - } - /** * Updates the last refresh time and refresh seq no which is seen by local store. */ diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index addd8a24af9c5..9cfc06591b728 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -27,6 +27,7 @@ import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.threadpool.ThreadPool; import java.io.FileNotFoundException; import java.io.IOException; @@ -75,6 +76,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private final RemoteStoreLockManager mdLockManager; + private final ThreadPool threadPool; + /** * To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation * This is achieved by uploading refresh metadata file with the same UUID suffix. @@ -96,15 +99,22 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class); + /** + * AtomicBoolean that ensures only one staleCommitDeletion activity is scheduled at a time. + */ + private final AtomicBoolean canDeleteStaleCommits = new AtomicBoolean(true); + public RemoteSegmentStoreDirectory( RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory, - RemoteStoreLockManager mdLockManager + RemoteStoreLockManager mdLockManager, + ThreadPool threadPool ) throws IOException { super(remoteDataDirectory); this.remoteDataDirectory = remoteDataDirectory; this.remoteMetadataDirectory = remoteMetadataDirectory; this.mdLockManager = mdLockManager; + this.threadPool = threadPool; init(); } @@ -574,7 +584,7 @@ public Map getSegmentsUploadedToRemoteStore(lon * @param lastNMetadataFilesToKeep number of metadata files to keep * @throws IOException in case of I/O error while reading from / writing to remote segment store */ - public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { + private void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException { Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); List sortedMetadataFileList = metadataFiles.stream().sorted(METADATA_FILENAME_COMPARATOR).collect(Collectors.toList()); if (sortedMetadataFileList.size() <= lastNMetadataFilesToKeep) { @@ -656,6 +666,33 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException } } + /** + * Delete stale segment and metadata files asynchronously. + * This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner. + * @param lastNMetadataFilesToKeep number of metadata files to keep + */ + public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) { + if (canDeleteStaleCommits.compareAndSet(true, false)) { + try { + threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { + try { + deleteStaleSegments(lastNMetadataFilesToKeep); + } catch (Exception e) { + logger.info( + "Exception while deleting stale commits from remote segment store, will retry delete post next commit", + e + ); + } finally { + canDeleteStaleCommits.set(true); + } + }); + } catch (Exception e) { + logger.info("Exception occurred while scheduling deleteStaleCommits", e); + canDeleteStaleCommits.set(true); + } + } + } + /* Tries to delete shard level directory if it is empty Return true if it deleted it successfully @@ -680,7 +717,7 @@ private boolean deleteIfEmpty() throws IOException { } public void close() throws IOException { - deleteStaleSegments(0); + deleteStaleSegmentsAsync(0); deleteIfEmpty(); } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 03995d5913fb3..3bec84f287ce4 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -20,6 +20,7 @@ import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.function.Supplier; @@ -34,8 +35,11 @@ public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.Dire private final Supplier repositoriesService; - public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService) { + private final ThreadPool threadPool; + + public RemoteSegmentStoreDirectoryFactory(Supplier repositoriesService, ThreadPool threadPool) { this.repositoriesService = repositoriesService; + this.threadPool = threadPool; } @Override @@ -62,7 +66,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh shardId ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool); } catch (RepositoryMissingException e) { throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", e); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 3742c817118da..d3655671b516d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -717,7 +717,8 @@ protected Node( clusterService.setRerouteService(rerouteService); final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory = new RemoteSegmentStoreDirectoryFactory( - repositoriesServiceReference::get + repositoriesServiceReference::get, + threadPool ); final IndicesService indicesService = new IndicesService( diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index d9d87196ca289..32b8fb5a4dc62 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -252,7 +252,7 @@ private IndexService newIndexService(IndexModule module) throws IOException { writableRegistry(), () -> false, null, - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), translogFactorySupplier ); } diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java index 7a9cbc12d823b..324315505987b 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactoryTests.java @@ -24,6 +24,7 @@ import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.Path; @@ -41,14 +42,16 @@ public class RemoteSegmentStoreDirectoryFactoryTests extends OpenSearchTestCase private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; + private ThreadPool threadPool; private RemoteSegmentStoreDirectoryFactory remoteSegmentStoreDirectoryFactory; @Before public void setup() { repositoriesServiceSupplier = mock(Supplier.class); repositoriesService = mock(RepositoriesService.class); + threadPool = mock(ThreadPool.class); when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService); - remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier); + remoteSegmentStoreDirectoryFactory = new RemoteSegmentStoreDirectoryFactory(repositoriesServiceSupplier, threadPool); } public void testNewDirectory() throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 3417e7b0aee04..58a208a634123 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -36,6 +36,7 @@ import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.file.NoSuchFileException; @@ -65,14 +66,21 @@ public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; private IndexShard indexShard; private SegmentInfos segmentInfos; + private ThreadPool threadPool; @Before public void setup() throws IOException { remoteDataDirectory = mock(RemoteDirectory.class); remoteMetadataDirectory = mock(RemoteDirectory.class); mdLockManager = mock(RemoteStoreMetadataLockManager.class); + threadPool = mock(ThreadPool.class); - remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(remoteDataDirectory, remoteMetadataDirectory, mdLockManager); + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDataDirectory, + remoteMetadataDirectory, + mdLockManager, + threadPool + ); Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build(); @@ -771,7 +779,7 @@ public void testDeleteStaleCommitsException() throws IOException { new IOException("Error reading") ); - assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteStaleSegments(5)); + assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5)); } public void testDeleteStaleCommitsWithinThreshold() throws IOException { @@ -779,7 +787,7 @@ public void testDeleteStaleCommitsWithinThreshold() throws IOException { // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=5 here so that none of the metadata files will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(5); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5); verify(remoteMetadataDirectory, times(0)).openInput(any(String.class), eq(IOContext.DEFAULT)); } @@ -790,7 +798,7 @@ public void testDeleteStaleCommitsActualDelete() throws IOException { // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; @@ -813,7 +821,7 @@ public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { doThrow(new IOException("Error")).when(remoteDataDirectory).deleteFile(segmentFileWithException); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; @@ -836,7 +844,7 @@ public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOExc doThrow(new NoSuchFileException(segmentFileWithException)).when(remoteDataDirectory).deleteFile(segmentFileWithException); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=2 here so that oldest 1 metadata file will be deleted - remoteSegmentStoreDirectory.deleteStaleSegments(2); + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); for (String metadata : metadataFilenameContentMapping.get("metadata__1__5__abc").values()) { String uploadedFilename = metadata.split(RemoteSegmentStoreDirectory.UploadedSegmentMetadata.SEPARATOR)[1]; diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 0bb2b604e8f1a..88899a1b282af 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1835,7 +1835,7 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, fileCacheCleaner ); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index ea9e9342673db..7f3819563dcbd 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -666,7 +666,7 @@ protected RemoteSegmentStoreDirectory createRemoteSegmentStoreDirectory(ShardId RemoteStoreLockManager remoteStoreLockManager = new RemoteStoreMetadataLockManager( new RemoteBufferedOutputDirectory(getBlobContainer(remoteShardPath.resolveIndex())) ); - return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager); + return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, remoteStoreLockManager, threadPool); } private RemoteDirectory newRemoteDirectory(Path f) throws IOException { From 07cbe092800e327db65812446666cb7f3d0ffa9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dharmesh=20=F0=9F=92=A4?= Date: Tue, 27 Jun 2023 11:15:54 +0530 Subject: [PATCH 5/8] Fixing tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dharmesh 💤 --- .../opensearch/remotestore/RemoteStoreIT.java | 4 +- .../store/RemoteSegmentStoreDirectory.java | 3 +- .../RemoteSegmentStoreDirectoryTests.java | 44 ++++++++++++++++++- 3 files changed, 47 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index c19dbb218dd6c..089ca1dd7cd30 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -281,7 +281,7 @@ public void testRemoteTranslogCleanup() throws Exception { public void testStaleCommitDeletionWithInvokeFlush() throws Exception { internalCluster().startDataOnlyNodes(3); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000)); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); int numberOfIterations = randomIntBetween(5, 15); indexData(numberOfIterations, true); String indexUUID = client().admin() @@ -307,7 +307,7 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception { public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { internalCluster().startDataOnlyNodes(3); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000)); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); int numberOfIterations = randomIntBetween(5, 15); indexData(numberOfIterations, false); String indexUUID = client().admin() diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 9cfc06591b728..ac129aca8baf7 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -101,8 +101,9 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement /** * AtomicBoolean that ensures only one staleCommitDeletion activity is scheduled at a time. + * Visible for testing */ - private final AtomicBoolean canDeleteStaleCommits = new AtomicBoolean(true); + protected final AtomicBoolean canDeleteStaleCommits = new AtomicBoolean(true); public RemoteSegmentStoreDirectory( RemoteDirectory remoteDataDirectory, diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 58a208a634123..bd22220e9b1b9 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -30,6 +30,7 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -46,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -83,11 +85,14 @@ public void setup() throws IOException { ); Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build(); + ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); try (Store store = indexShard.store()) { segmentInfos = store.readLastCommittedSegmentsInfo(); } + + when(threadPool.executor(ThreadPool.Names.REMOTE_PURGE)).thenReturn(executorService); } @After @@ -775,11 +780,44 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException { } public void testDeleteStaleCommitsException() throws IOException { + populateMetadata(); when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow( new IOException("Error reading") ); - assertThrows(IOException.class, () -> remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5)); + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); + assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); + } + + public void testDeleteStaleCommitsExceptionWhileScheduling() throws IOException { + populateMetadata(); + doThrow(new IllegalArgumentException()).when(threadPool).executor(any(String.class)); + + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); + assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); + } + + public void testDeleteStaleCommitsWithDeletionAlreadyInProgress() throws IOException { + populateMetadata(); + remoteSegmentStoreDirectory.canDeleteStaleCommits.set(false); + + // popluateMetadata() adds stub to return 3 metadata files + // We are passing lastNMetadataFilesToKeep=2 here to validate that in case of exception deleteFile is not + // invoked + remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + + verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); + assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get() == false); } public void testDeleteStaleCommitsWithinThreshold() throws IOException { @@ -790,6 +828,7 @@ public void testDeleteStaleCommitsWithinThreshold() throws IOException { remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5); verify(remoteMetadataDirectory, times(0)).openInput(any(String.class), eq(IOContext.DEFAULT)); + assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); } public void testDeleteStaleCommitsActualDelete() throws IOException { @@ -806,6 +845,7 @@ public void testDeleteStaleCommitsActualDelete() throws IOException { } ; verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); + assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); } public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { @@ -829,6 +869,7 @@ public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { } ; verify(remoteMetadataDirectory, times(0)).deleteFile("metadata__1__5__abc"); + assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); } public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOException { @@ -852,6 +893,7 @@ public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOExc } ; verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); + assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); } public void testSegmentMetadataCurrentVersion() { From 051a512e582727b425d95898956bc5441faabd2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dharmesh=20=F0=9F=92=A4?= Date: Thu, 29 Jun 2023 00:41:32 +0530 Subject: [PATCH 6/8] Fixing build post resolving conflicts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dharmesh 💤 --- .../main/java/org/opensearch/index/shard/IndexShard.java | 2 +- .../java/org/opensearch/index/shard/StoreRecovery.java | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d89d51c713d70..01c0a12d463ea 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2632,7 +2632,7 @@ public void restoreFromSnapshotAndRemoteStore( assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " + recoveryState.getRecoverySource(); StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener); + storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 119524e8caf8a..da4e9113143af 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -69,6 +69,7 @@ import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.nio.channels.FileChannel; @@ -356,7 +357,8 @@ void recoverFromSnapshotAndRemoteStore( final IndexShard indexShard, Repository repository, RepositoriesService repositoriesService, - ActionListener listener + ActionListener listener, + ThreadPool threadPool ) { try { if (canRecover(indexShard)) { @@ -384,7 +386,10 @@ void recoverFromSnapshotAndRemoteStore( remoteStoreRepository = shallowCopyShardMetadata.getRemoteStoreRepository(); } - RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService); + RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory( + () -> repositoriesService, + threadPool + ); RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory( remoteStoreRepository, indexUUID, From 45b823323eb96d8db3c84ef9e64804f6444f810b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dharmesh=20=F0=9F=92=A4?= Date: Thu, 29 Jun 2023 22:46:07 +0530 Subject: [PATCH 7/8] Adding assertBusy statement in unit tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dharmesh 💤 --- .../RemoteSegmentStoreDirectoryTests.java | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index bd22220e9b1b9..66e4b9a357b85 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -59,6 +59,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.doReturn; +import static org.hamcrest.CoreMatchers.is; public class RemoteSegmentStoreDirectoryTests extends IndexShardTestCase { private RemoteDirectory remoteDataDirectory; @@ -779,7 +780,7 @@ public void testIncorrectChecksumCorruptIndexException() throws IOException { assertThrows(CorruptIndexException.class, () -> remoteSegmentStoreDirectory.init()); } - public void testDeleteStaleCommitsException() throws IOException { + public void testDeleteStaleCommitsException() throws Exception { populateMetadata(); when(remoteMetadataDirectory.listFilesByPrefix(RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX)).thenThrow( new IOException("Error reading") @@ -790,11 +791,11 @@ public void testDeleteStaleCommitsException() throws IOException { // invoked remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); - assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); } - public void testDeleteStaleCommitsExceptionWhileScheduling() throws IOException { + public void testDeleteStaleCommitsExceptionWhileScheduling() throws Exception { populateMetadata(); doThrow(new IllegalArgumentException()).when(threadPool).executor(any(String.class)); @@ -803,11 +804,11 @@ public void testDeleteStaleCommitsExceptionWhileScheduling() throws IOException // invoked remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); - assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); } - public void testDeleteStaleCommitsWithDeletionAlreadyInProgress() throws IOException { + public void testDeleteStaleCommitsWithDeletionAlreadyInProgress() throws Exception { populateMetadata(); remoteSegmentStoreDirectory.canDeleteStaleCommits.set(false); @@ -816,22 +817,22 @@ public void testDeleteStaleCommitsWithDeletionAlreadyInProgress() throws IOExcep // invoked remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(2); + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(false))); verify(remoteMetadataDirectory, times(0)).deleteFile(any(String.class)); - assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get() == false); } - public void testDeleteStaleCommitsWithinThreshold() throws IOException { + public void testDeleteStaleCommitsWithinThreshold() throws Exception { populateMetadata(); // popluateMetadata() adds stub to return 3 metadata files // We are passing lastNMetadataFilesToKeep=5 here so that none of the metadata files will be deleted remoteSegmentStoreDirectory.deleteStaleSegmentsAsync(5); + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).openInput(any(String.class), eq(IOContext.DEFAULT)); - assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); } - public void testDeleteStaleCommitsActualDelete() throws IOException { + public void testDeleteStaleCommitsActualDelete() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -844,11 +845,11 @@ public void testDeleteStaleCommitsActualDelete() throws IOException { verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); - assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); } - public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { + public void testDeleteStaleCommitsActualDeleteIOException() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -868,11 +869,11 @@ public void testDeleteStaleCommitsActualDeleteIOException() throws IOException { verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory, times(0)).deleteFile("metadata__1__5__abc"); - assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); } - public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOException { + public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws Exception { Map> metadataFilenameContentMapping = populateMetadata(); remoteSegmentStoreDirectory.init(); @@ -892,8 +893,8 @@ public void testDeleteStaleCommitsActualDeleteNoSuchFileException() throws IOExc verify(remoteDataDirectory).deleteFile(uploadedFilename); } ; + assertBusy(() -> assertThat(remoteSegmentStoreDirectory.canDeleteStaleCommits.get(), is(true))); verify(remoteMetadataDirectory).deleteFile("metadata__1__5__abc"); - assertTrue(remoteSegmentStoreDirectory.canDeleteStaleCommits.get()); } public void testSegmentMetadataCurrentVersion() { From 5c446f364f6b75e3ca6d7c0e4ac2c476207065fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dharmesh=20=F0=9F=92=A4?= Date: Fri, 30 Jun 2023 16:57:05 +0530 Subject: [PATCH 8/8] Removing catch exception from IT MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dharmesh 💤 --- .../opensearch/remotestore/RemoteStoreIT.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 089ca1dd7cd30..f6ba8cfed00d0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -292,16 +292,14 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception { Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); // Delete is async. assertBusy(() -> { - try { - int actualFileCount = getFileCount(indexPath); - if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) { - assertEquals(numberOfIterations, actualFileCount); - } else { - // As delete is async its possible that the file gets created before the deletion or after - // deletion. - assertTrue(actualFileCount >= 10 || actualFileCount <= 11); - } - } catch (Exception e) {} + int actualFileCount = getFileCount(indexPath); + if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) { + assertEquals(numberOfIterations, actualFileCount); + } else { + // As delete is async its possible that the file gets created before the deletion or after + // deletion. + assertTrue(actualFileCount >= 10 || actualFileCount <= 11); + } }, 30, TimeUnit.SECONDS); }