From e64fd535d15e56eb7a1e891bb0e6543e0cb83cd1 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 14 Dec 2018 11:39:23 +0100 Subject: [PATCH] SNAPSHOTS: Allow Parallel Restore Operations (#36397) * Enable parallel restore operations * Add uuid to restore in progress entries to uniquely identify them * Adjust restore in progress entries to be a map in cluster state * Added tests for: * Parallel restore from two different snapshots * Parallel restore from a single snapshot to different indices to test uuid identifiers are correctly used by `RestoreService` and routing allocator * Parallel restore with waiting for completion to test transport actions correctly use uuid identifiers --- docs/reference/indices/recovery.asciidoc | 3 +- .../TransportRestoreSnapshotAction.java | 5 +- .../elasticsearch/cluster/ClusterModule.java | 2 +- .../cluster/RestoreInProgress.java | 113 +++++++++---- .../cluster/routing/RecoverySource.java | 27 +++- .../RestoreInProgressAllocationDecider.java | 22 ++- .../snapshots/RestoreService.java | 148 ++++++++++-------- .../snapshots/SnapshotsService.java | 2 +- .../cluster/ClusterModuleTests.java | 4 +- .../cluster/ClusterStateDiffIT.java | 6 +- .../cluster/routing/ShardRoutingTests.java | 2 +- .../cluster/routing/UnassignedInfoTests.java | 4 +- .../NodeVersionAllocationDeciderTests.java | 6 +- .../allocation/ThrottlingAllocationTests.java | 12 +- ...storeInProgressAllocationDeciderTests.java | 8 +- .../ClusterSerializationTests.java | 8 +- .../gateway/PrimaryShardAllocatorTests.java | 3 +- .../index/shard/IndexShardTests.java | 2 +- .../indices/recovery/IndexRecoveryIT.java | 1 + .../SharedClusterSnapshotRestoreIT.java | 125 ++++++++++++++- .../cluster/routing/TestShardRouting.java | 1 + .../index/shard/IndexShardTestCase.java | 4 +- .../xpack/ccr/CcrRepositoryIT.java | 5 +- .../SourceOnlySnapshotShardTests.java | 4 +- 24 files changed, 376 insertions(+), 141 deletions(-) diff --git a/docs/reference/indices/recovery.asciidoc b/docs/reference/indices/recovery.asciidoc index 49f58e645bcda..0929b36e7742d 100644 --- a/docs/reference/indices/recovery.asciidoc +++ b/docs/reference/indices/recovery.asciidoc @@ -90,7 +90,8 @@ Response: "repository" : "my_repository", "snapshot" : "my_snapshot", "index" : "index1", - "version" : "{version}" + "version" : "{version}", + "restoreUUID": "PDh1ZAOaRbiGIVtCvZOMww" }, "target" : { "id" : "ryqJ5lO5S4-lSFbGntkEkg", diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index 6ca6697ac2ed5..199d1ed1a02c8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -93,12 +93,13 @@ protected void masterOperation(final RestoreSnapshotRequest request, final Clust public void onResponse(RestoreCompletionResponse restoreCompletionResponse) { if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { final Snapshot snapshot = restoreCompletionResponse.getSnapshot(); + String uuid = restoreCompletionResponse.getUuid(); ClusterStateListener clusterStateListener = new ClusterStateListener() { @Override public void clusterChanged(ClusterChangedEvent changedEvent) { - final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), snapshot); - final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), snapshot); + final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid); + final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid); if (prevEntry == null) { // When there is a master failure after a restore has been started, this listener might not be registered // on the current master and as such it might miss some intermediary cluster states due to batching. diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 9bb608cc7ac16..1d8a8735dffae 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -118,7 +118,7 @@ public ClusterModule(Settings settings, ClusterService clusterService, List> getClusterStateCustomSuppliers(List clusterPlugins) { final Map> customSupplier = new HashMap<>(); customSupplier.put(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new); - customSupplier.put(RestoreInProgress.TYPE, RestoreInProgress::new); + customSupplier.put(RestoreInProgress.TYPE, () -> new RestoreInProgress.Builder().build()); customSupplier.put(SnapshotsInProgress.TYPE, SnapshotsInProgress::new); for (ClusterPlugin plugin : clusterPlugins) { Map> initialCustomSupplier = plugin.getInitialClusterStateCustomSupplier(); diff --git a/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java b/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java index 138788251c90a..0c87eb3b131bc 100644 --- a/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster; +import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState.Custom; @@ -32,36 +33,33 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.UUID; /** * Meta data about restore processes that are currently executing */ -public class RestoreInProgress extends AbstractNamedDiffable implements Custom { +public class RestoreInProgress extends AbstractNamedDiffable implements Custom, Iterable { + + /** + * Fallback UUID used for restore operations that were started before v7.0 and don't have a uuid in the cluster state. + */ + public static final String BWC_UUID = new UUID(0, 0).toString(); public static final String TYPE = "restore"; - private final List entries; + private final ImmutableOpenMap entries; /** * Constructs new restore metadata * - * @param entries list of currently running restore processes + * @param entries map of currently running restore processes keyed by their restore uuid */ - public RestoreInProgress(Entry... entries) { - this.entries = Arrays.asList(entries); - } - - /** - * Returns list of currently running restore processes - * - * @return list of currently running restore processes - */ - public List entries() { - return this.entries; + private RestoreInProgress(ImmutableOpenMap entries) { + this.entries = entries; } @Override @@ -83,20 +81,48 @@ public int hashCode() { @Override public String toString() { - StringBuilder builder = new StringBuilder("RestoreInProgress["); - for (int i = 0; i < entries.size(); i++) { - builder.append(entries.get(i).snapshot().getSnapshotId().getName()); - if (i + 1 < entries.size()) { - builder.append(","); - } + return new StringBuilder("RestoreInProgress[").append(entries).append("]").toString(); + } + + public Entry get(String restoreUUID) { + return entries.get(restoreUUID); + } + + public boolean isEmpty() { + return entries.isEmpty(); + } + + @Override + public Iterator iterator() { + return entries.valuesIt(); + } + + public static final class Builder { + + private final ImmutableOpenMap.Builder entries = ImmutableOpenMap.builder(); + + public Builder() { + } + + public Builder(RestoreInProgress restoreInProgress) { + entries.putAll(restoreInProgress.entries); + } + + public Builder add(Entry entry) { + entries.put(entry.uuid, entry); + return this; + } + + public RestoreInProgress build() { + return new RestoreInProgress(entries.build()); } - return builder.append("]").toString(); } /** * Restore metadata */ public static class Entry { + private final String uuid; private final State state; private final Snapshot snapshot; private final ImmutableOpenMap shards; @@ -105,12 +131,14 @@ public static class Entry { /** * Creates new restore metadata * + * @param uuid uuid of the restore * @param snapshot snapshot * @param state current state of the restore process * @param indices list of indices being restored * @param shards map of shards being restored to their current restore status */ - public Entry(Snapshot snapshot, State state, List indices, ImmutableOpenMap shards) { + public Entry(String uuid, Snapshot snapshot, State state, List indices, + ImmutableOpenMap shards) { this.snapshot = Objects.requireNonNull(snapshot); this.state = Objects.requireNonNull(state); this.indices = Objects.requireNonNull(indices); @@ -119,6 +147,15 @@ public Entry(Snapshot snapshot, State state, List indices, ImmutableOpen } else { this.shards = shards; } + this.uuid = Objects.requireNonNull(uuid); + } + + /** + * Returns restore uuid + * @return restore uuid + */ + public String uuid() { + return uuid; } /** @@ -166,7 +203,8 @@ public boolean equals(Object o) { return false; } @SuppressWarnings("unchecked") Entry entry = (Entry) o; - return snapshot.equals(entry.snapshot) && + return uuid.equals(entry.uuid) && + snapshot.equals(entry.snapshot) && state == entry.state && indices.equals(entry.indices) && shards.equals(entry.shards); @@ -174,7 +212,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(snapshot, state, indices, shards); + return Objects.hash(uuid, snapshot, state, indices, shards); } } @@ -393,8 +431,15 @@ public static NamedDiff readDiffFrom(StreamInput in) throws IOException } public RestoreInProgress(StreamInput in) throws IOException { - Entry[] entries = new Entry[in.readVInt()]; - for (int i = 0; i < entries.length; i++) { + int count = in.readVInt(); + final ImmutableOpenMap.Builder entriesBuilder = ImmutableOpenMap.builder(count); + for (int i = 0; i < count; i++) { + final String uuid; + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + uuid = in.readString(); + } else { + uuid = BWC_UUID; + } Snapshot snapshot = new Snapshot(in); State state = State.fromValue(in.readByte()); int indices = in.readVInt(); @@ -409,9 +454,9 @@ public RestoreInProgress(StreamInput in) throws IOException { ShardRestoreStatus shardState = ShardRestoreStatus.readShardRestoreStatus(in); builder.put(shardId, shardState); } - entries[i] = new Entry(snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build()); + entriesBuilder.put(uuid, new Entry(uuid, snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build())); } - this.entries = Arrays.asList(entries); + this.entries = entriesBuilder.build(); } /** @@ -420,7 +465,11 @@ public RestoreInProgress(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(entries.size()); - for (Entry entry : entries) { + for (ObjectCursor v : entries.values()) { + Entry entry = v.value; + if (out.getVersion().onOrAfter(Version.V_6_6_0)) { + out.writeString(entry.uuid); + } entry.snapshot().writeTo(out); out.writeByte(entry.state().value()); out.writeVInt(entry.indices().size()); @@ -441,8 +490,8 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startArray("snapshots"); - for (Entry entry : entries) { - toXContent(entry, builder, params); + for (ObjectCursor entry : entries.values()) { + toXContent(entry.value, builder, params); } builder.endArray(); return builder; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index 271098897a154..f8011efcd243c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.Version; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -208,22 +209,33 @@ public String toString() { * recovery from a snapshot */ public static class SnapshotRecoverySource extends RecoverySource { + private final String restoreUUID; private final Snapshot snapshot; private final String index; private final Version version; - public SnapshotRecoverySource(Snapshot snapshot, Version version, String index) { + public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, String index) { + this.restoreUUID = restoreUUID; this.snapshot = Objects.requireNonNull(snapshot); this.version = Objects.requireNonNull(version); this.index = Objects.requireNonNull(index); } SnapshotRecoverySource(StreamInput in) throws IOException { + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + restoreUUID = in.readString(); + } else { + restoreUUID = RestoreInProgress.BWC_UUID; + } snapshot = new Snapshot(in); version = Version.readVersion(in); index = in.readString(); } + public String restoreUUID() { + return restoreUUID; + } + public Snapshot snapshot() { return snapshot; } @@ -238,6 +250,9 @@ public Version version() { @Override protected void writeAdditionalFields(StreamOutput out) throws IOException { + if (out.getVersion().onOrAfter(Version.V_6_6_0)) { + out.writeString(restoreUUID); + } snapshot.writeTo(out); Version.writeVersion(version, out); out.writeString(index); @@ -253,12 +268,13 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param builder.field("repository", snapshot.getRepository()) .field("snapshot", snapshot.getSnapshotId().getName()) .field("version", version.toString()) - .field("index", index); + .field("index", index) + .field("restoreUUID", restoreUUID); } @Override public String toString() { - return "snapshot recovery from " + snapshot.toString(); + return "snapshot recovery [" + restoreUUID + "] from " + snapshot; } @Override @@ -271,12 +287,13 @@ public boolean equals(Object o) { } @SuppressWarnings("unchecked") SnapshotRecoverySource that = (SnapshotRecoverySource) o; - return snapshot.equals(that.snapshot) && index.equals(that.index) && version.equals(that.version); + return restoreUUID.equals(that.restoreUUID) && snapshot.equals(that.snapshot) + && index.equals(that.index) && version.equals(that.version); } @Override public int hashCode() { - return Objects.hash(snapshot, index, version); + return Objects.hash(restoreUUID, snapshot, index, version); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java index 63971ca46e468..18bd5f2f13a4a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.snapshots.Snapshot; /** * This {@link AllocationDecider} prevents shards that have failed to be @@ -46,25 +45,24 @@ public Decision canAllocate(final ShardRouting shardRouting, final RoutingAlloca return allocation.decision(Decision.YES, NAME, "ignored as shard is not being recovered from a snapshot"); } - final Snapshot snapshot = ((RecoverySource.SnapshotRecoverySource) recoverySource).snapshot(); + RecoverySource.SnapshotRecoverySource source = (RecoverySource.SnapshotRecoverySource) recoverySource; final RestoreInProgress restoresInProgress = allocation.custom(RestoreInProgress.TYPE); if (restoresInProgress != null) { - for (RestoreInProgress.Entry restoreInProgress : restoresInProgress.entries()) { - if (restoreInProgress.snapshot().equals(snapshot)) { - RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId()); - if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) { - assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting - + "] to be in initializing state but got [" + shardRestoreStatus.state() + "]"; - return allocation.decision(Decision.YES, NAME, "shard is currently being restored"); - } - break; + RestoreInProgress.Entry restoreInProgress = restoresInProgress.get(source.restoreUUID()); + if (restoreInProgress != null) { + RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId()); + if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) { + assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting + + "] to be in initializing state but got [" + shardRestoreStatus.state() + "]"; + return allocation.decision(Decision.YES, NAME, "shard is currently being restored"); } } } return allocation.decision(Decision.NO, NAME, "shard has failed to be restored from the snapshot [%s] because of [%s] - " + "manually close or delete the index [%s] in order to retry to restore the snapshot again or use the reroute API to force the " + - "allocation of an empty primary shard", snapshot, shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName()); + "allocation of an empty primary shard", + source.snapshot(), shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName()); } @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 34046c205afcb..ad3907bf05284 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -219,15 +219,18 @@ public void restoreSnapshot(final RestoreRequest request, final ActionListener indexEntry : indices.entrySet()) { String index = indexEntry.getValue(); boolean partial = checkPartial(index); - SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(snapshot, snapshotInfo.version(), index); + SnapshotRecoverySource recoverySource = new SnapshotRecoverySource(restoreUUID, snapshot, snapshotInfo.version(), index); String renamedIndexName = indexEntry.getKey(); IndexMetaData snapshotIndexMetaData = metaData.index(index); snapshotIndexMetaData = updateIndexSettings(snapshotIndexMetaData, request.indexSettings, request.ignoreIndexSettings); @@ -329,8 +332,18 @@ public ClusterState execute(ClusterState currentState) { } shards = shardsBuilder.build(); - RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry(snapshot, overallState(RestoreInProgress.State.INIT, shards), Collections.unmodifiableList(new ArrayList<>(indices.keySet())), shards); - builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restoreEntry)); + RestoreInProgress.Entry restoreEntry = new RestoreInProgress.Entry( + restoreUUID, snapshot, overallState(RestoreInProgress.State.INIT, shards), + Collections.unmodifiableList(new ArrayList<>(indices.keySet())), + shards + ); + RestoreInProgress.Builder restoreInProgressBuilder; + if (restoreInProgress != null) { + restoreInProgressBuilder = new RestoreInProgress.Builder(restoreInProgress); + } else { + restoreInProgressBuilder = new RestoreInProgress.Builder(); + } + builder.putCustom(RestoreInProgress.TYPE, restoreInProgressBuilder.add(restoreEntry).build()); } else { shards = ImmutableOpenMap.of(); } @@ -485,7 +498,7 @@ public TimeValue timeout() { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new RestoreCompletionResponse(snapshot, restoreInfo)); + listener.onResponse(new RestoreCompletionResponse(restoreUUID, snapshot, restoreInfo)); } }); @@ -498,8 +511,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set deletedIndices) { boolean changesMade = false; - final List entries = new ArrayList<>(); - for (RestoreInProgress.Entry entry : oldRestore.entries()) { + RestoreInProgress.Builder builder = new RestoreInProgress.Builder(); + for (RestoreInProgress.Entry entry : oldRestore) { ImmutableOpenMap.Builder shardsBuilder = null; for (ObjectObjectCursor cursor : entry.shards()) { ShardId shardId = cursor.key; @@ -513,27 +526,33 @@ public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInPr } if (shardsBuilder != null) { ImmutableOpenMap shards = shardsBuilder.build(); - entries.add(new RestoreInProgress.Entry(entry.snapshot(), overallState(RestoreInProgress.State.STARTED, shards), entry.indices(), shards)); + builder.add(new RestoreInProgress.Entry(entry.uuid(), entry.snapshot(), overallState(RestoreInProgress.State.STARTED, shards), entry.indices(), shards)); } else { - entries.add(entry); + builder.add(entry); } } if (changesMade) { - return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()])); + return builder.build(); } else { return oldRestore; } } public static final class RestoreCompletionResponse { + private final String uuid; private final Snapshot snapshot; private final RestoreInfo restoreInfo; - private RestoreCompletionResponse(final Snapshot snapshot, final RestoreInfo restoreInfo) { + private RestoreCompletionResponse(final String uuid, final Snapshot snapshot, final RestoreInfo restoreInfo) { + this.uuid = uuid; this.snapshot = snapshot; this.restoreInfo = restoreInfo; } + public String getUuid() { + return uuid; + } + public Snapshot getSnapshot() { return snapshot; } @@ -544,7 +563,7 @@ public RestoreInfo getRestoreInfo() { } public static class RestoreInProgressUpdater extends RoutingChangesObserver.AbstractRoutingChangesObserver { - private final Map shardChanges = new HashMap<>(); + private final Map shardChanges = new HashMap<>(); @Override public void shardStarted(ShardRouting initializingShard, ShardRouting startedShard) { @@ -552,8 +571,8 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha if (initializingShard.primary()) { RecoverySource recoverySource = initializingShard.recoverySource(); if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { - Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot(); - changes(snapshot).shards.put(initializingShard.shardId(), + changes(recoverySource).shards.put( + initializingShard.shardId(), new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS)); } } @@ -564,13 +583,13 @@ public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) if (failedShard.primary() && failedShard.initializing()) { RecoverySource recoverySource = failedShard.recoverySource(); if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { - Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot(); // mark restore entry for this shard as failed when it's due to a file corruption. There is no need wait on retries // to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed, // however, we only want to acknowledge the restore operation once it has been successfully restored on another node. if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) { - changes(snapshot).shards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(), - RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage())); + changes(recoverySource).shards.put( + failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(), + RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage())); } } } @@ -581,9 +600,11 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali // if we force an empty primary, we should also fail the restore entry if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT && initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) { - Snapshot snapshot = ((SnapshotRecoverySource) unassignedShard.recoverySource()).snapshot(); - changes(snapshot).shards.put(unassignedShard.shardId(), new ShardRestoreStatus(null, - RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource())); + changes(unassignedShard.recoverySource()).shards.put( + unassignedShard.shardId(), + new ShardRestoreStatus(null, + RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource()) + ); } } @@ -592,19 +613,21 @@ public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo n RecoverySource recoverySource = unassignedShard.recoverySource(); if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { if (newUnassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO) { - Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot(); String reason = "shard could not be allocated to any of the nodes"; - changes(snapshot).shards.put(unassignedShard.shardId(), + changes(recoverySource).shards.put( + unassignedShard.shardId(), new ShardRestoreStatus(unassignedShard.currentNodeId(), RestoreInProgress.State.FAILURE, reason)); } } } /** - * Helper method that creates update entry for the given shard id if such an entry does not exist yet. + * Helper method that creates update entry for the given recovery source's restore uuid + * if such an entry does not exist yet. */ - private Updates changes(Snapshot snapshot) { - return shardChanges.computeIfAbsent(snapshot, k -> new Updates()); + private Updates changes(RecoverySource recoverySource) { + assert recoverySource.getType() == RecoverySource.Type.SNAPSHOT; + return shardChanges.computeIfAbsent(((SnapshotRecoverySource) recoverySource).restoreUUID(), k -> new Updates()); } private static class Updates { @@ -613,38 +636,38 @@ private static class Updates { public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) { if (shardChanges.isEmpty() == false) { - final List entries = new ArrayList<>(); - for (RestoreInProgress.Entry entry : oldRestore.entries()) { - Snapshot snapshot = entry.snapshot(); - Updates updates = shardChanges.get(snapshot); - if (updates.shards.isEmpty() == false) { - ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(entry.shards()); + RestoreInProgress.Builder builder = new RestoreInProgress.Builder(); + for (RestoreInProgress.Entry entry : oldRestore) { + Updates updates = shardChanges.get(entry.uuid()); + ImmutableOpenMap shardStates = entry.shards(); + if (updates != null && updates.shards.isEmpty() == false) { + ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(shardStates); for (Map.Entry shard : updates.shards.entrySet()) { - shardsBuilder.put(shard.getKey(), shard.getValue()); + ShardId shardId = shard.getKey(); + ShardRestoreStatus status = shardStates.get(shardId); + if (status == null || status.state().completed() == false) { + shardsBuilder.put(shardId, shard.getValue()); + } } ImmutableOpenMap shards = shardsBuilder.build(); RestoreInProgress.State newState = overallState(RestoreInProgress.State.STARTED, shards); - entries.add(new RestoreInProgress.Entry(entry.snapshot(), newState, entry.indices(), shards)); + builder.add(new RestoreInProgress.Entry(entry.uuid(), entry.snapshot(), newState, entry.indices(), shards)); } else { - entries.add(entry); + builder.add(entry); } } - return new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()])); + return builder.build(); } else { return oldRestore; } } } - public static RestoreInProgress.Entry restoreInProgress(ClusterState state, Snapshot snapshot) { + public static RestoreInProgress.Entry restoreInProgress(ClusterState state, String restoreUUID) { final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE); if (restoreInProgress != null) { - for (RestoreInProgress.Entry e : restoreInProgress.entries()) { - if (e.snapshot().equals(snapshot)) { - return e; - } - } + return restoreInProgress.get(restoreUUID); } return null; } @@ -652,15 +675,15 @@ public static RestoreInProgress.Entry restoreInProgress(ClusterState state, Snap static class CleanRestoreStateTaskExecutor implements ClusterStateTaskExecutor, ClusterStateTaskListener { static class Task { - final Snapshot snapshot; + final String uuid; - Task(Snapshot snapshot) { - this.snapshot = snapshot; + Task(String uuid) { + this.uuid = uuid; } @Override public String toString() { - return "clean restore state for restoring snapshot " + snapshot; + return "clean restore state for restore " + uuid; } } @@ -673,25 +696,24 @@ public String toString() { @Override public ClusterTasksResult execute(final ClusterState currentState, final List tasks) throws Exception { final ClusterTasksResult.Builder resultBuilder = ClusterTasksResult.builder().successes(tasks); - Set completedSnapshots = tasks.stream().map(e -> e.snapshot).collect(Collectors.toSet()); - final List entries = new ArrayList<>(); + Set completedRestores = tasks.stream().map(e -> e.uuid).collect(Collectors.toSet()); + RestoreInProgress.Builder restoreInProgressBuilder = new RestoreInProgress.Builder(); final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); boolean changed = false; if (restoreInProgress != null) { - for (RestoreInProgress.Entry entry : restoreInProgress.entries()) { - if (completedSnapshots.contains(entry.snapshot()) == false) { - entries.add(entry); - } else { + for (RestoreInProgress.Entry entry : restoreInProgress) { + if (completedRestores.contains(entry.uuid())) { changed = true; + } else { + restoreInProgressBuilder.add(entry); } } } if (changed == false) { return resultBuilder.build(currentState); } - RestoreInProgress updatedRestoreInProgress = new RestoreInProgress(entries.toArray(new RestoreInProgress.Entry[entries.size()])); ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(currentState.getCustoms()); - builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress); + builder.put(RestoreInProgress.TYPE, restoreInProgressBuilder.build()); ImmutableOpenMap customs = builder.build(); return resultBuilder.build(ClusterState.builder(currentState).customs(customs).build()); } @@ -713,12 +735,12 @@ private void cleanupRestoreState(ClusterChangedEvent event) { RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE); if (restoreInProgress != null) { - for (RestoreInProgress.Entry entry : restoreInProgress.entries()) { + for (RestoreInProgress.Entry entry : restoreInProgress) { if (entry.state().completed()) { assert completed(entry.shards()) : "state says completed but restore entries are not"; clusterService.submitStateUpdateTask( "clean up snapshot restore state", - new CleanRestoreStateTaskExecutor.Task(entry.snapshot()), + new CleanRestoreStateTaskExecutor.Task(entry.uuid()), ClusterStateTaskConfig.build(Priority.URGENT), cleanRestoreStateTaskExecutor, cleanRestoreStateTaskExecutor); @@ -815,7 +837,7 @@ public static void checkIndexClosing(ClusterState currentState, Set indicesToFail = null; - for (RestoreInProgress.Entry entry : restore.entries()) { + for (RestoreInProgress.Entry entry : restore) { for (ObjectObjectCursor shard : entry.shards()) { if (!shard.value.state().completed()) { IndexMetaData indexMetaData = currentState.metaData().index(shard.key.getIndex()); @@ -853,10 +875,10 @@ public void applyClusterState(ClusterChangedEvent event) { * @return true if repository is currently in use by one of the running snapshots */ public static boolean isRepositoryInUse(ClusterState clusterState, String repository) { - RestoreInProgress snapshots = clusterState.custom(RestoreInProgress.TYPE); - if (snapshots != null) { - for (RestoreInProgress.Entry snapshot : snapshots.entries()) { - if (repository.equals(snapshot.snapshot().getRepository())) { + RestoreInProgress restoreInProgress = clusterState.custom(RestoreInProgress.TYPE); + if (restoreInProgress != null) { + for (RestoreInProgress.Entry entry: restoreInProgress) { + if (repository.equals(entry.snapshot().getRepository())) { return true; } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index cdd86061b9940..6b69a40ebe280 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1137,7 +1137,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { // don't allow snapshot deletions while a restore is taking place, // otherwise we could end up deleting a snapshot that is being restored // and the files the restore depends on would all be gone - if (restoreInProgress.entries().isEmpty() == false) { + if (restoreInProgress.isEmpty() == false) { throw new ConcurrentSnapshotExecutionException(snapshot, "cannot delete snapshot during a restore"); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index c479bc8122412..3f70ddcb9cd80 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -257,8 +257,8 @@ public void testPre63CustomsFiltering() { final String whiteListedClusterCustom = randomFrom(ClusterModule.PRE_6_3_CLUSTER_CUSTOMS_WHITE_LIST); final String whiteListedMetaDataCustom = randomFrom(ClusterModule.PRE_6_3_METADATA_CUSTOMS_WHITE_LIST); final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .putCustom(whiteListedClusterCustom, new RestoreInProgress()) - .putCustom("other", new RestoreInProgress()) + .putCustom(whiteListedClusterCustom, new RestoreInProgress.Builder().build()) + .putCustom("other", new RestoreInProgress.Builder().build()) .metaData(MetaData.builder() .putCustom(whiteListedMetaDataCustom, new RepositoriesMetaData(Collections.emptyList())) .putCustom("other", new RepositoriesMetaData(Collections.emptyList())) diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index b5359634fcb36..88ab31f53a4b4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -699,11 +699,13 @@ public ClusterState.Custom randomCreate(String name) { (long) randomIntBetween(0, 1000), ImmutableOpenMap.of())); case 1: - return new RestoreInProgress(new RestoreInProgress.Entry( + return new RestoreInProgress.Builder().add( + new RestoreInProgress.Entry( + UUIDs.randomBase64UUID(), new Snapshot(randomName("repo"), new SnapshotId(randomName("snap"), UUIDs.randomBase64UUID())), RestoreInProgress.State.fromValue((byte) randomIntBetween(0, 3)), emptyList(), - ImmutableOpenMap.of())); + ImmutableOpenMap.of())).build(); default: throw new IllegalArgumentException("Shouldn't be here"); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java index 1216f143686c4..5bcac4a1e2618 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ShardRoutingTests.java @@ -164,7 +164,7 @@ public void testEqualsIgnoringVersion() { } else { otherRouting = new ShardRouting(otherRouting.shardId(), otherRouting.currentNodeId(), otherRouting.relocatingNodeId(), otherRouting.primary(), otherRouting.state(), - new RecoverySource.SnapshotRecoverySource(new Snapshot("test", + new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), new Snapshot("test", new SnapshotId("s1", UUIDs.randomBase64UUID())), Version.CURRENT, "test"), otherRouting.unassignedInfo(), otherRouting.allocationId(), otherRouting.getExpectedShardSize()); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 68ba3bd0b1230..e035f0094dcc8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -142,6 +142,7 @@ public void testNewIndexRestored() { ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .metaData(metaData) .routingTable(RoutingTable.builder().addAsNewRestore(metaData.index("test"), new SnapshotRecoverySource( + UUIDs.randomBase64UUID(), new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT, "test"), new IntHashSet()).build()).build(); for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { @@ -157,7 +158,8 @@ public void testExistingIndexRestored() { ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .metaData(metaData) .routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"), - new SnapshotRecoverySource(new Snapshot("rep1", + new SnapshotRecoverySource( + UUIDs.randomBase64UUID(), new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT, "test")).build()).build(); for (ShardRouting shard : clusterState.getRoutingNodes().shardsWithState(UNASSIGNED)) { assertThat(shard.unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED)); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java index 6aa3495b840f8..cf46f2c08499c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java @@ -370,7 +370,9 @@ public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() { ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .metaData(metaData) .routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"), - new SnapshotRecoverySource(new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), + new SnapshotRecoverySource( + UUIDs.randomBase64UUID(), + new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), Version.CURRENT, "test")).build()) .nodes(DiscoveryNodes.builder().add(newNode).add(oldNode1).add(oldNode2)).build(); AllocationDeciders allocationDeciders = new AllocationDeciders(Arrays.asList( @@ -486,9 +488,11 @@ public void testMessages() { newNode.node().getVersion() + "] to a node with older version [" + oldNode.node().getVersion() + "]")); final SnapshotRecoverySource newVersionSnapshot = new SnapshotRecoverySource( + UUIDs.randomBase64UUID(), new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), newNode.node().getVersion(), "test"); final SnapshotRecoverySource oldVersionSnapshot = new SnapshotRecoverySource( + UUIDs.randomBase64UUID(), new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), oldNode.node().getVersion(), "test"); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java index b67e0ccae22b3..3c88de4b639ca 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -335,6 +336,7 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); Snapshot snapshot = new Snapshot("repo", new SnapshotId("snap", "randomId")); Set snapshotIndices = new HashSet<>(); + String restoreUUID = UUIDs.randomBase64UUID(); for (ObjectCursor cursor: metaData.indices().values()) { Index index = cursor.value.getIndex(); IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(cursor.value); @@ -357,12 +359,14 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat case 3: snapshotIndices.add(index.getName()); routingTableBuilder.addAsNewRestore(indexMetaData, - new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName()), new IntHashSet()); + new SnapshotRecoverySource( + restoreUUID, snapshot, Version.CURRENT, indexMetaData.getIndex().getName()), new IntHashSet()); break; case 4: snapshotIndices.add(index.getName()); routingTableBuilder.addAsRestore(indexMetaData, - new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName())); + new SnapshotRecoverySource( + restoreUUID, snapshot, Version.CURRENT, indexMetaData.getIndex().getName())); break; case 5: routingTableBuilder.addAsNew(indexMetaData); @@ -385,9 +389,9 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat } } - RestoreInProgress.Entry restore = new RestoreInProgress.Entry(snapshot, RestoreInProgress.State.INIT, + RestoreInProgress.Entry restore = new RestoreInProgress.Entry(restoreUUID, snapshot, RestoreInProgress.State.INIT, new ArrayList<>(snapshotIndices), restoreShards.build()); - restores.put(RestoreInProgress.TYPE, new RestoreInProgress(restore)); + restores.put(RestoreInProgress.TYPE, new RestoreInProgress.Builder().add(restore).build()); } return ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java index 86190b107e5e7..60e3e2438c19f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.Snapshot; @@ -139,10 +140,11 @@ public void testCanAllocatePrimaryExistingInRestoreInProgress() { Snapshot snapshot = recoverySource.snapshot(); RestoreInProgress.State restoreState = RestoreInProgress.State.STARTED; - RestoreInProgress.Entry restore = new RestoreInProgress.Entry(snapshot, restoreState, singletonList("test"), shards.build()); + RestoreInProgress.Entry restore = + new RestoreInProgress.Entry(recoverySource.restoreUUID(), snapshot, restoreState, singletonList("test"), shards.build()); clusterState = ClusterState.builder(clusterState) - .putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restore)) + .putCustom(RestoreInProgress.TYPE, new RestoreInProgress.Builder().add(restore).build()) .routingTable(routingTable) .build(); @@ -202,6 +204,6 @@ private Decision executeAllocation(final ClusterState clusterState, final ShardR private RecoverySource.SnapshotRecoverySource createSnapshotRecoverySource(final String snapshotName) { Snapshot snapshot = new Snapshot("_repository", new SnapshotId(snapshotName, "_uuid")); - return new RecoverySource.SnapshotRecoverySource(snapshot, Version.CURRENT, "test"); + return new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test"); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java b/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java index f67996b215ab5..696da1a2b5539 100644 --- a/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/serialization/ClusterSerializationTests.java @@ -118,14 +118,15 @@ public void testSnapshotDeletionsInProgressSerialization() throws Exception { )); if (includeRestore) { builder.putCustom(RestoreInProgress.TYPE, - new RestoreInProgress( + new RestoreInProgress.Builder().add( new RestoreInProgress.Entry( - new Snapshot("repo2", new SnapshotId("snap2", UUIDs.randomBase64UUID())), + UUIDs.randomBase64UUID(), new Snapshot("repo2", new SnapshotId("snap2", UUIDs.randomBase64UUID())), RestoreInProgress.State.STARTED, Collections.singletonList("index_name"), ImmutableOpenMap.of() ) - )); + ).build() + ); } ClusterState clusterState = builder.incrementVersion().build(); @@ -137,6 +138,7 @@ public void testSnapshotDeletionsInProgressSerialization() throws Exception { outStream.setVersion(version); diffs.writeTo(outStream); StreamInput inStream = outStream.bytes().streamInput(); + inStream.setVersion(version); inStream = new NamedWriteableAwareStreamInput(inStream, new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); Diff serializedDiffs = ClusterState.readDiffFrom(inStream, clusterState.nodes().getLocalNode()); ClusterState stateAfterDiffs = serializedDiffs.apply(ClusterState.EMPTY_STATE); diff --git a/server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java index e6fc0c535dfc4..3be4f1c1d809e 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -391,7 +391,8 @@ private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocat final Snapshot snapshot = new Snapshot("test", new SnapshotId("test", UUIDs.randomBase64UUID())); RoutingTable routingTable = RoutingTable.builder() - .addAsRestore(metaData.index(shardId.getIndex()), new SnapshotRecoverySource(snapshot, Version.CURRENT, shardId.getIndexName())) + .addAsRestore(metaData.index(shardId.getIndex()), + new SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, shardId.getIndexName())) .build(); ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .metaData(metaData) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 83e40782da6ba..d954085d04d1e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2095,7 +2095,7 @@ public void testRestoreShard() throws IOException { RecoverySource.ExistingStoreRecoverySource.INSTANCE); final Snapshot snapshot = new Snapshot("foo", new SnapshotId("bar", UUIDs.randomBase64UUID())); routing = ShardRoutingHelper.newWithRestoreSource(routing, - new RecoverySource.SnapshotRecoverySource(snapshot, Version.CURRENT, "test")); + new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, Version.CURRENT, "test")); target = reinitShard(target, routing); Store sourceStore = source.store(); Store targetStore = target.store(); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index b3f0762c8bd54..9b2bacdd7fb4c 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -500,6 +500,7 @@ public void testSnapshotRecovery() throws Exception { for (RecoveryState recoveryState : recoveryStates) { SnapshotRecoverySource recoverySource = new SnapshotRecoverySource( + ((SnapshotRecoverySource)recoveryState.getRecoverySource()).restoreUUID(), new Snapshot(REPO_NAME, createSnapshotResponse.getSnapshotInfo().snapshotId()), Version.CURRENT, INDEX_NAME); assertRecoveryState(recoveryState, 0, recoverySource, true, Stage.DONE, null, nodeA); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index e5a1a246f85e9..971241662c2ac 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -89,6 +89,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.StoredScriptsIT; import org.elasticsearch.snapshots.mockstore.MockRepository; @@ -1113,7 +1114,8 @@ private void unrestorableUseCase(final String indexName, // check that there is no restore in progress RestoreInProgress restoreInProgress = clusterStateResponse.getState().custom(RestoreInProgress.TYPE); assertNotNull("RestoreInProgress must be not null", restoreInProgress); - assertThat("RestoreInProgress must be empty", restoreInProgress.entries(), hasSize(0)); + assertTrue( + "RestoreInProgress must be empty but found entries in " + restoreInProgress, restoreInProgress.isEmpty()); // check that the shards have been created but are not assigned assertThat(clusterStateResponse.getState().getRoutingTable().allShards(indexName), hasSize(numShards.totalNumShards)); @@ -3513,6 +3515,127 @@ public void testSnapshottingWithMissingSequenceNumbers() { assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(15L)); } + public void testParallelRestoreOperations() { + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String repoName = "test-restore-snapshot-repo"; + String snapshotName1 = "test-restore-snapshot1"; + String snapshotName2 = "test-restore-snapshot2"; + String absolutePath = randomRepoPath().toAbsolutePath().toString(); + logger.info("Path [{}]", absolutePath); + String restoredIndexName1 = indexName1 + "-restored"; + String restoredIndexName2 = indexName2 + "-restored"; + String typeName = "actions"; + String expectedValue = "expected"; + + Client client = client(); + // Write a document + String docId = Integer.toString(randomInt()); + index(indexName1, typeName, docId, "value", expectedValue); + + String docId2 = Integer.toString(randomInt()); + index(indexName2, typeName, docId2, "value", expectedValue); + + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("fs").setSettings(Settings.builder() + .put("location", absolutePath) + )); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName1) + .setWaitForCompletion(true) + .setIndices(indexName1) + .get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + CreateSnapshotResponse createSnapshotResponse2 = client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName2) + .setWaitForCompletion(true) + .setIndices(indexName2) + .get(); + assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse2.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse2.getSnapshotInfo().totalShards())); + assertThat(createSnapshotResponse2.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName1) + .setWaitForCompletion(false) + .setRenamePattern(indexName1) + .setRenameReplacement(restoredIndexName1) + .get(); + RestoreSnapshotResponse restoreSnapshotResponse2 = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName2) + .setWaitForCompletion(false) + .setRenamePattern(indexName2) + .setRenameReplacement(restoredIndexName2) + .get(); + assertThat(restoreSnapshotResponse1.status(), equalTo(RestStatus.ACCEPTED)); + assertThat(restoreSnapshotResponse2.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(restoredIndexName1, restoredIndexName2); + assertThat(client.prepareGet(restoredIndexName1, typeName, docId).get().isExists(), equalTo(true)); + assertThat(client.prepareGet(restoredIndexName2, typeName, docId2).get().isExists(), equalTo(true)); + } + + public void testParallelRestoreOperationsFromSingleSnapshot() throws Exception { + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String repoName = "test-restore-snapshot-repo"; + String snapshotName = "test-restore-snapshot"; + String absolutePath = randomRepoPath().toAbsolutePath().toString(); + logger.info("Path [{}]", absolutePath); + String restoredIndexName1 = indexName1 + "-restored"; + String restoredIndexName2 = indexName2 + "-restored"; + String typeName = "actions"; + String expectedValue = "expected"; + + Client client = client(); + // Write a document + String docId = Integer.toString(randomInt()); + index(indexName1, typeName, docId, "value", expectedValue); + + String docId2 = Integer.toString(randomInt()); + index(indexName2, typeName, docId2, "value", expectedValue); + + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("fs").setSettings(Settings.builder() + .put("location", absolutePath) + )); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true) + .setIndices(indexName1, indexName2) + .get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + + ActionFuture restoreSnapshotResponse1 = client.admin().cluster() + .prepareRestoreSnapshot(repoName, snapshotName) + .setIndices(indexName1) + .setRenamePattern(indexName1) + .setRenameReplacement(restoredIndexName1) + .execute(); + + boolean sameSourceIndex = randomBoolean(); + + ActionFuture restoreSnapshotResponse2 = client.admin().cluster() + .prepareRestoreSnapshot(repoName, snapshotName) + .setIndices(sameSourceIndex ? indexName1 : indexName2) + .setRenamePattern(sameSourceIndex ? indexName1 : indexName2) + .setRenameReplacement(restoredIndexName2) + .execute(); + assertThat(restoreSnapshotResponse1.get().status(), equalTo(RestStatus.ACCEPTED)); + assertThat(restoreSnapshotResponse2.get().status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(restoredIndexName1, restoredIndexName2); + assertThat(client.prepareGet(restoredIndexName1, typeName, docId).get().isExists(), equalTo(true)); + assertThat(client.prepareGet(restoredIndexName2, typeName, sameSourceIndex ? docId : docId2).get().isExists(), equalTo(true)); + } + @TestLogging("org.elasticsearch.snapshots:TRACE") public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { final Client client = client(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java index 995900c1e65e7..9d892d192a25a 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java @@ -144,6 +144,7 @@ public static RecoverySource randomRecoverySource() { RecoverySource.PeerRecoverySource.INSTANCE, RecoverySource.LocalShardsRecoverySource.INSTANCE, new RecoverySource.SnapshotRecoverySource( + UUIDs.randomBase64UUID(), new Snapshot("repo", new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID())), Version.CURRENT, "some_index")); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 6a2c7780c383e..e1afebd5640f6 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.ClusterSettings; @@ -755,7 +756,8 @@ protected void recoverShardFromSnapshot(final IndexShard shard, final String index = shardId.getIndexName(); final IndexId indexId = new IndexId(shardId.getIndex().getName(), shardId.getIndex().getUUID()); final DiscoveryNode node = getFakeDiscoNode(shard.routingEntry().currentNodeId()); - final RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource(snapshot, version, index); + final RecoverySource.SnapshotRecoverySource recoverySource = + new RecoverySource.SnapshotRecoverySource(UUIDs.randomBase64UUID(), snapshot, version, index); final ShardRouting shardRouting = newShardRouting(shardId, node.getId(), true, ShardRoutingState.INITIALIZING, recoverySource); shard.markAsRecovering("from snapshot", new RecoveryState(shardRouting, node, null)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index aff5a2862e17a..0057df49b7c36 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -158,12 +158,13 @@ private ActionListener waitForRestore( public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) { if (restoreCompletionResponse.getRestoreInfo() == null) { final Snapshot snapshot = restoreCompletionResponse.getSnapshot(); + final String uuid = restoreCompletionResponse.getUuid(); ClusterStateListener clusterStateListener = new ClusterStateListener() { @Override public void clusterChanged(ClusterChangedEvent changedEvent) { - final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), snapshot); - final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), snapshot); + final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid); + final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid); if (prevEntry == null) { // When there is a master failure after a restore has been started, this listener might not be registered // on the current master and as such it might miss some intermediary cluster states due to batching. diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index fe72ab59558aa..6b94920c561dc 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; @@ -204,7 +205,8 @@ public void testRestoreMinmal() throws IOException { shard.refresh("test"); ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), true, ShardRoutingState.INITIALIZING, - new RecoverySource.SnapshotRecoverySource(new Snapshot("src_only", snapshotId), Version.CURRENT, indexId.getId())); + new RecoverySource.SnapshotRecoverySource( + UUIDs.randomBase64UUID(), new Snapshot("src_only", snapshotId), Version.CURRENT, indexId.getId())); IndexMetaData metaData = runAsSnapshot(threadPool, () -> repository.getSnapshotIndexMetaData(snapshotId, indexId)); IndexShard restoredShard = newShard(shardRouting, metaData, null, SourceOnlySnapshotRepository.getEngineFactory(), () -> {}); restoredShard.mapperService().merge(shard.indexSettings().getIndexMetaData(), MapperService.MergeReason.MAPPING_RECOVERY, false);