Skip to content

Commit

Permalink
[Remote Store] Introduce RemoteStoreRestoreService to handle restore …
Browse files Browse the repository at this point in the history
…from remote store (#9256)

Signed-off-by: bansvaru <bansvaru@amazon.com>
  • Loading branch information
linuxpi committed Aug 11, 2023
1 parent a1fc31c commit d102a04
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.snapshots.RestoreService;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -33,14 +33,14 @@
public final class TransportRestoreRemoteStoreAction extends TransportClusterManagerNodeAction<
RestoreRemoteStoreRequest,
RestoreRemoteStoreResponse> {
private final RestoreService restoreService;
private final RemoteStoreRestoreService restoreService;

@Inject
public TransportRestoreRemoteStoreAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
RestoreService restoreService,
RemoteStoreRestoreService restoreService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
Expand Down Expand Up @@ -84,20 +84,17 @@ protected void clusterManagerOperation(
final ClusterState state,
final ActionListener<RestoreRemoteStoreResponse> listener
) {
restoreService.restoreFromRemoteStore(
request,
ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
RestoreRemoteStoreResponse::new
);
} else {
delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo()));
}
})
);
restoreService.restore(request, ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
RestoreRemoteStoreResponse::new
);
} else {
delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo()));
}
}));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.recovery;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.repositories.IndexId;
import org.opensearch.snapshots.RestoreInfo;
import org.opensearch.snapshots.RestoreService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;

/**
* Service responsible for restoring index data from remote store
*
* @opensearch.internal
*/
public class RemoteStoreRestoreService {
private static final Logger logger = LogManager.getLogger(RemoteStoreRestoreService.class);

private final ClusterService clusterService;

private final AllocationService allocationService;

public RemoteStoreRestoreService(ClusterService clusterService, AllocationService allocationService) {
this.clusterService = clusterService;
this.allocationService = allocationService;
}

public void restore(RestoreRemoteStoreRequest request, final ActionListener<RestoreService.RestoreCompletionResponse> listener) {
clusterService.submitStateUpdateTask("restore[remote_store]", new ClusterStateUpdateTask() {
final String restoreUUID = UUIDs.randomBase64UUID();
RestoreInfo restoreInfo = null;

@Override
public ClusterState execute(ClusterState currentState) {
// Updating cluster state
ClusterState.Builder builder = ClusterState.builder(currentState);
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());

List<String> indicesToBeRestored = new ArrayList<>();
int totalShards = 0;
for (String index : request.indices()) {
IndexMetadata currentIndexMetadata = currentState.metadata().index(index);
if (currentIndexMetadata == null) {
// ToDo: Handle index metadata does not exist case. (GitHub #3457)
logger.warn("Remote store restore is not supported for non-existent index. Skipping: {}", index);
continue;
}
if (currentIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) {
IndexMetadata updatedIndexMetadata = currentIndexMetadata;
Map<ShardId, ShardRouting> activeInitializingShards = new HashMap<>();
if (request.restoreAllShards()) {
if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) {
throw new IllegalStateException(
"cannot restore index ["
+ index
+ "] because an open index "
+ "with same name already exists in the cluster. Close the existing index"
);
}
updatedIndexMetadata = IndexMetadata.builder(currentIndexMetadata)
.state(IndexMetadata.State.OPEN)
.version(1 + currentIndexMetadata.getVersion())
.mappingVersion(1 + currentIndexMetadata.getMappingVersion())
.settingsVersion(1 + currentIndexMetadata.getSettingsVersion())
.aliasesVersion(1 + currentIndexMetadata.getAliasesVersion())
.build();
} else {
activeInitializingShards = currentState.routingTable()
.index(index)
.shards()
.values()
.stream()
.map(IndexShardRoutingTable::primaryShard)
.filter(shardRouting -> shardRouting.unassigned() == false)
.collect(Collectors.toMap(ShardRouting::shardId, Function.identity()));
}

IndexId indexId = new IndexId(index, updatedIndexMetadata.getIndexUUID());

RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
);
rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, activeInitializingShards);
blocks.updateBlocks(updatedIndexMetadata);
mdBuilder.put(updatedIndexMetadata, true);
indicesToBeRestored.add(index);
totalShards += updatedIndexMetadata.getNumberOfShards();
} else {
logger.warn("Remote store is not enabled for index: {}", index);
}
}

restoreInfo = new RestoreInfo("remote_store", indicesToBeRestored, totalShards, totalShards);

RoutingTable rt = rtBuilder.build();
ClusterState updatedState = builder.metadata(mdBuilder).blocks(blocks).routingTable(rt).build();
return allocationService.reroute(updatedState, "restored from remote store");
}

@Override
public void onFailure(String source, Exception e) {
logger.warn("failed to restore from remote store", e);
listener.onFailure(e);
}

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new RestoreService.RestoreCompletionResponse(restoreUUID, null, restoreInfo));
}
});

}
}
6 changes: 6 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
Expand Down Expand Up @@ -945,6 +946,10 @@ protected Node(
indicesService,
clusterInfoService::getClusterInfo
);
RemoteStoreRestoreService remoteStoreRestoreService = new RemoteStoreRestoreService(
clusterService,
clusterModule.getAllocationService()
);

final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
settings,
Expand Down Expand Up @@ -1141,6 +1146,7 @@ protected Node(
b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus);
b.bind(RestoreService.class).toInstance(restoreService);
b.bind(RemoteStoreRestoreService.class).toInstance(remoteStoreRestoreService);
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ public static final class RestoreCompletionResponse {
private final Snapshot snapshot;
private final RestoreInfo restoreInfo;

private RestoreCompletionResponse(final String uuid, final Snapshot snapshot, final RestoreInfo restoreInfo) {
public RestoreCompletionResponse(final String uuid, final Snapshot snapshot, final RestoreInfo restoreInfo) {
this.uuid = uuid;
this.snapshot = snapshot;
this.restoreInfo = restoreInfo;
Expand Down

0 comments on commit d102a04

Please sign in to comment.