From d30199f32ed3010bf2b80e11c4abe6c16a8ee279 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Sat, 12 Aug 2023 07:38:11 +0530 Subject: [PATCH] =?UTF-8?q?[Backport=202.x][Remote=20Store]=20Introduce=20?= =?UTF-8?q?RemoteStoreRestoreService=20to=20handle=20restore=20=E2=80=A6?= =?UTF-8?q?=20(#9261)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --------- Signed-off-by: bansvaru --- .../TransportRestoreRemoteStoreAction.java | 33 ++-- .../recovery/RemoteStoreRestoreService.java | 153 ++++++++++++++++++ .../main/java/org/opensearch/node/Node.java | 6 + .../opensearch/snapshots/RestoreService.java | 2 +- 4 files changed, 175 insertions(+), 19 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java index 613bf078b2c96..992f706a4d243 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java @@ -19,7 +19,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.snapshots.RestoreService; +import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -33,14 +33,14 @@ public final class TransportRestoreRemoteStoreAction extends TransportClusterManagerNodeAction< RestoreRemoteStoreRequest, RestoreRemoteStoreResponse> { - private final RestoreService restoreService; + private final RemoteStoreRestoreService restoreService; @Inject public TransportRestoreRemoteStoreAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - RestoreService restoreService, + RemoteStoreRestoreService restoreService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver ) { @@ -84,20 +84,17 @@ protected void clusterManagerOperation( final ClusterState state, final ActionListener listener ) { - restoreService.restoreFromRemoteStore( - request, - ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> { - if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { - RestoreClusterStateListener.createAndRegisterListener( - clusterService, - restoreCompletionResponse, - delegatedListener, - RestoreRemoteStoreResponse::new - ); - } else { - delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo())); - } - }) - ); + restoreService.restore(request, ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> { + if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { + RestoreClusterStateListener.createAndRegisterListener( + clusterService, + restoreCompletionResponse, + delegatedListener, + RestoreRemoteStoreResponse::new + ); + } else { + delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo())); + } + })); } } diff --git a/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java new file mode 100644 index 0000000000000..50e051ea8bf5e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.recovery; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.UUIDs; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.repositories.IndexId; +import org.opensearch.snapshots.RestoreInfo; +import org.opensearch.snapshots.RestoreService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; + +/** + * Service responsible for restoring index data from remote store + * + * @opensearch.internal + */ +public class RemoteStoreRestoreService { + private static final Logger logger = LogManager.getLogger(RemoteStoreRestoreService.class); + + private final ClusterService clusterService; + + private final AllocationService allocationService; + + public RemoteStoreRestoreService(ClusterService clusterService, AllocationService allocationService) { + this.clusterService = clusterService; + this.allocationService = allocationService; + } + + public void restore(RestoreRemoteStoreRequest request, final ActionListener listener) { + clusterService.submitStateUpdateTask("restore[remote_store]", new ClusterStateUpdateTask() { + final String restoreUUID = UUIDs.randomBase64UUID(); + RestoreInfo restoreInfo = null; + + @Override + public ClusterState execute(ClusterState currentState) { + // Updating cluster state + ClusterState.Builder builder = ClusterState.builder(currentState); + Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable()); + + List indicesToBeRestored = new ArrayList<>(); + int totalShards = 0; + for (String index : request.indices()) { + IndexMetadata currentIndexMetadata = currentState.metadata().index(index); + if (currentIndexMetadata == null) { + // ToDo: Handle index metadata does not exist case. (GitHub #3457) + logger.warn("Remote store restore is not supported for non-existent index. Skipping: {}", index); + continue; + } + if (currentIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) { + IndexMetadata updatedIndexMetadata = currentIndexMetadata; + Map activeInitializingShards = new HashMap<>(); + if (request.restoreAllShards()) { + if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) { + throw new IllegalStateException( + "cannot restore index [" + + index + + "] because an open index " + + "with same name already exists in the cluster. Close the existing index" + ); + } + updatedIndexMetadata = IndexMetadata.builder(currentIndexMetadata) + .state(IndexMetadata.State.OPEN) + .version(1 + currentIndexMetadata.getVersion()) + .mappingVersion(1 + currentIndexMetadata.getMappingVersion()) + .settingsVersion(1 + currentIndexMetadata.getSettingsVersion()) + .aliasesVersion(1 + currentIndexMetadata.getAliasesVersion()) + .build(); + } else { + activeInitializingShards = currentState.routingTable() + .index(index) + .shards() + .values() + .stream() + .map(IndexShardRoutingTable::primaryShard) + .filter(shardRouting -> shardRouting.unassigned() == false) + .collect(Collectors.toMap(ShardRouting::shardId, Function.identity())); + } + + IndexId indexId = new IndexId(index, updatedIndexMetadata.getIndexUUID()); + + RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource( + restoreUUID, + updatedIndexMetadata.getCreationVersion(), + indexId + ); + rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, activeInitializingShards); + blocks.updateBlocks(updatedIndexMetadata); + mdBuilder.put(updatedIndexMetadata, true); + indicesToBeRestored.add(index); + totalShards += updatedIndexMetadata.getNumberOfShards(); + } else { + logger.warn("Remote store is not enabled for index: {}", index); + } + } + + restoreInfo = new RestoreInfo("remote_store", indicesToBeRestored, totalShards, totalShards); + + RoutingTable rt = rtBuilder.build(); + ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build(); + return allocationService.reroute(updatedState, "restored from remote store"); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn("failed to restore from remote store", e); + listener.onFailure(e); + } + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new RestoreService.RestoreCompletionResponse(restoreUUID, null, restoreInfo)); + } + }); + + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 0a58aba8c35e6..ad902c2ac855f 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -46,6 +46,7 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.RunnableTaskExecutionListener; +import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheCleaner; @@ -947,6 +948,10 @@ protected Node( indicesService, clusterInfoService::getClusterInfo ); + RemoteStoreRestoreService remoteStoreRestoreService = new RemoteStoreRestoreService( + clusterService, + clusterModule.getAllocationService() + ); final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor( settings, @@ -1143,6 +1148,7 @@ protected Node( b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService); b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus); b.bind(RestoreService.class).toInstance(restoreService); + b.bind(RemoteStoreRestoreService.class).toInstance(remoteStoreRestoreService); b.bind(RerouteService.class).toInstance(rerouteService); b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index 029c5b0377d26..1775adb2f0903 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -1003,7 +1003,7 @@ public static final class RestoreCompletionResponse { private final Snapshot snapshot; private final RestoreInfo restoreInfo; - private RestoreCompletionResponse(final String uuid, final Snapshot snapshot, final RestoreInfo restoreInfo) { + public RestoreCompletionResponse(final String uuid, final Snapshot snapshot, final RestoreInfo restoreInfo) { this.uuid = uuid; this.snapshot = snapshot; this.restoreInfo = restoreInfo;