diff --git a/CHANGELOG.md b/CHANGELOG.md index 4912d465a87a8..7034abad61782 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637)) - Making _cat/allocation API use indexLevelStats ([#15292](https://github.com/opensearch-project/OpenSearch/pull/15292)) - Memory optimisations in _cluster/health API ([#15492](https://github.com/opensearch-project/OpenSearch/pull/15492)) +- Add support for async deletion in S3BlobContainer ([#15621](https://github.com/opensearch-project/OpenSearch/pull/15621)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index c5438d58e679d..944de326d144c 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -153,6 +153,7 @@ protected Settings nodeSettings(int nodeOrdinal) { // Disable request throttling because some random values in tests might generate too many failures for the S3 client .put(S3ClientSettings.USE_THROTTLE_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), false) .put(S3ClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace("test").getKey(), ProxySettings.ProxyType.DIRECT) + .put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false) .put(super.nodeSettings(nodeOrdinal)) .setSecureSettings(secureSettings); diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java index 7db9a0d3ba790..f0e40db965646 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -55,6 +55,14 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTestCase { + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false) + .build(); + } + @Override @Before @SuppressForbidden(reason = "Need to set system property here for AWS SDK v2") diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 09832e2b41b6d..ecdd23530c648 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -779,7 +779,10 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED, RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX, RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX, + + // Snapshot related Settings BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING, + BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING, diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index b954560c1bc94..0292cecc36a81 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -50,6 +50,7 @@ import org.opensearch.action.ActionRunnable; import org.opensearch.action.StepListener; import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.RepositoryCleanupInProgress; @@ -69,6 +70,7 @@ import org.opensearch.common.Randomness; import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; @@ -180,6 +182,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -353,6 +356,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.Final ); + /** + * Controls the fixed prefix for the snapshot shard blob path. cluster.snapshot.async-deletion.enable + */ + public static final Setting SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING = Setting.boolSetting( + "cluster.snapshot.async-deletion.enable", + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + protected volatile boolean supportURLRepo; private volatile int maxShardBlobDeleteBatch; @@ -446,6 +459,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final String snapshotShardPathPrefix; + private volatile boolean enableAsyncDeletion; + /** * Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for * {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart @@ -498,6 +513,8 @@ protected BlobStoreRepository( this.recoverySettings = recoverySettings; this.remoteStoreSettings = new RemoteStoreSettings(clusterService.getSettings(), clusterService.getClusterSettings()); this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings()); + this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings()); + clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion); } @Override @@ -2082,7 +2099,7 @@ private void executeOneStaleIndexDelete( } // Finally, we delete the [base_path]/indexId folder - deleteResult = deleteResult.add(indexEntry.getValue().delete()); // Deleting the index folder + deleteResult = deleteResult.add(deleteContainer(indexEntry.getValue())); // Deleting the index folder logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); return deleteResult; } catch (IOException e) { @@ -2115,6 +2132,21 @@ private void executeOneStaleIndexDelete( })); } + private DeleteResult deleteContainer(BlobContainer container) throws IOException { + long startTime = System.nanoTime(); + DeleteResult deleteResult; + if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) { + // Use deleteAsync and wait for the result + PlainActionFuture future = new PlainActionFuture<>(); + ((AsyncMultiStreamBlobContainer) container).deleteAsync(future); + deleteResult = future.actionGet(); + } else { + deleteResult = container.delete(); + } + logger.debug(new ParameterizedMessage("[{}] Deleted {} in {}ns", metadata.name(), container.path(), startTime - System.nanoTime())); + return deleteResult; + } + /** * Cleans up the remote store directory if needed. *

This method cleans up segments in the remote store directory for deleted indices. @@ -2318,7 +2350,7 @@ void releaseRemoteStoreLocksAndCleanup( * @return A DeleteResult object representing the result of the deletion operation. * @throws IOException If an I/O error occurs during the deletion process. */ - private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException { + private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException, ExecutionException, InterruptedException { // If the provided ShardInfo is null, return a zero DeleteResult if (shardInfo == null) { return DeleteResult.ZERO; @@ -2330,7 +2362,7 @@ private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException { // Iterate over the shards and delete each shard's data for (int i = 0; i < shardInfo.getShardCount(); i++) { // Call the delete method on the shardContainer and accumulate the result - deleteResult = deleteResult.add(shardContainer(shardInfo.getIndexId(), i).delete()); + deleteResult = deleteResult.add(deleteContainer(shardContainer(shardInfo.getIndexId(), i))); } // Return the accumulated DeleteResult @@ -2714,7 +2746,23 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna private void deleteFromContainer(BlobContainer container, List blobs) throws IOException { logger.trace(() -> new ParameterizedMessage("[{}] Deleting {} from [{}]", metadata.name(), blobs, container.path())); - container.deleteBlobsIgnoringIfNotExists(blobs); + long startTime = System.nanoTime(); + if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) { + PlainActionFuture future = new PlainActionFuture<>(); + ((AsyncMultiStreamBlobContainer) container).deleteBlobsAsyncIgnoringIfNotExists(blobs, future); + future.actionGet(); + } else { + container.deleteBlobsIgnoringIfNotExists(blobs); + } + logger.debug( + () -> new ParameterizedMessage( + "[{}] Deletion {} from [{}] took {}ns", + metadata.name(), + blobs, + container.path(), + System.nanoTime() - startTime + ) + ); } private BlobPath indicesPath() { @@ -4565,4 +4613,8 @@ public String toString() { return name; } } + + public void setEnableAsyncDeletion(boolean enableAsyncDeletion) { + this.enableAsyncDeletion = enableAsyncDeletion; + } }