Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNAPSHOTS: Allow Parallel Restore Operations (#36397) #36659

Merged
merged 4 commits into from
Dec 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/reference/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ Response:
"repository" : "my_repository",
"snapshot" : "my_snapshot",
"index" : "index1",
"version" : "{version}"
"version" : "{version}",
"restoreUUID": "PDh1ZAOaRbiGIVtCvZOMww"
},
"target" : {
"id" : "ryqJ5lO5S4-lSFbGntkEkg",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,13 @@ protected void masterOperation(final RestoreSnapshotRequest request, final Clust
public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
final Snapshot snapshot = restoreCompletionResponse.getSnapshot();
String uuid = restoreCompletionResponse.getUuid();

ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), snapshot);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), snapshot);
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
public static Map<String, Supplier<ClusterState.Custom>> getClusterStateCustomSuppliers(List<ClusterPlugin> clusterPlugins) {
final Map<String, Supplier<ClusterState.Custom>> customSupplier = new HashMap<>();
customSupplier.put(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new);
customSupplier.put(RestoreInProgress.TYPE, RestoreInProgress::new);
customSupplier.put(RestoreInProgress.TYPE, () -> new RestoreInProgress.Builder().build());
customSupplier.put(SnapshotsInProgress.TYPE, SnapshotsInProgress::new);
for (ClusterPlugin plugin : clusterPlugins) {
Map<String, Supplier<ClusterState.Custom>> initialCustomSupplier = plugin.getInitialClusterStateCustomSupplier();
Expand Down
113 changes: 81 additions & 32 deletions server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
Expand All @@ -32,36 +33,33 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;

/**
* Meta data about restore processes that are currently executing
*/
public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements Custom {
public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements Custom, Iterable<RestoreInProgress.Entry> {

/**
* Fallback UUID used for restore operations that were started before v7.0 and don't have a uuid in the cluster state.
*/
public static final String BWC_UUID = new UUID(0, 0).toString();

public static final String TYPE = "restore";

private final List<Entry> entries;
private final ImmutableOpenMap<String, Entry> entries;

/**
* Constructs new restore metadata
*
* @param entries list of currently running restore processes
* @param entries map of currently running restore processes keyed by their restore uuid
*/
public RestoreInProgress(Entry... entries) {
this.entries = Arrays.asList(entries);
}

/**
* Returns list of currently running restore processes
*
* @return list of currently running restore processes
*/
public List<Entry> entries() {
return this.entries;
private RestoreInProgress(ImmutableOpenMap<String, Entry> entries) {
this.entries = entries;
}

@Override
Expand All @@ -83,20 +81,48 @@ public int hashCode() {

@Override
public String toString() {
StringBuilder builder = new StringBuilder("RestoreInProgress[");
for (int i = 0; i < entries.size(); i++) {
builder.append(entries.get(i).snapshot().getSnapshotId().getName());
if (i + 1 < entries.size()) {
builder.append(",");
}
return new StringBuilder("RestoreInProgress[").append(entries).append("]").toString();
}

public Entry get(String restoreUUID) {
return entries.get(restoreUUID);
}

public boolean isEmpty() {
return entries.isEmpty();
}

@Override
public Iterator<Entry> iterator() {
return entries.valuesIt();
}

public static final class Builder {

private final ImmutableOpenMap.Builder<String, Entry> entries = ImmutableOpenMap.builder();

public Builder() {
}

public Builder(RestoreInProgress restoreInProgress) {
entries.putAll(restoreInProgress.entries);
}

public Builder add(Entry entry) {
entries.put(entry.uuid, entry);
return this;
}

public RestoreInProgress build() {
return new RestoreInProgress(entries.build());
}
return builder.append("]").toString();
}

/**
* Restore metadata
*/
public static class Entry {
private final String uuid;
private final State state;
private final Snapshot snapshot;
private final ImmutableOpenMap<ShardId, ShardRestoreStatus> shards;
Expand All @@ -105,12 +131,14 @@ public static class Entry {
/**
* Creates new restore metadata
*
* @param uuid uuid of the restore
* @param snapshot snapshot
* @param state current state of the restore process
* @param indices list of indices being restored
* @param shards map of shards being restored to their current restore status
*/
public Entry(Snapshot snapshot, State state, List<String> indices, ImmutableOpenMap<ShardId, ShardRestoreStatus> shards) {
public Entry(String uuid, Snapshot snapshot, State state, List<String> indices,
ImmutableOpenMap<ShardId, ShardRestoreStatus> shards) {
this.snapshot = Objects.requireNonNull(snapshot);
this.state = Objects.requireNonNull(state);
this.indices = Objects.requireNonNull(indices);
Expand All @@ -119,6 +147,15 @@ public Entry(Snapshot snapshot, State state, List<String> indices, ImmutableOpen
} else {
this.shards = shards;
}
this.uuid = Objects.requireNonNull(uuid);
}

/**
* Returns restore uuid
* @return restore uuid
*/
public String uuid() {
return uuid;
}

/**
Expand Down Expand Up @@ -166,15 +203,16 @@ public boolean equals(Object o) {
return false;
}
@SuppressWarnings("unchecked") Entry entry = (Entry) o;
return snapshot.equals(entry.snapshot) &&
return uuid.equals(entry.uuid) &&
snapshot.equals(entry.snapshot) &&
state == entry.state &&
indices.equals(entry.indices) &&
shards.equals(entry.shards);
}

@Override
public int hashCode() {
return Objects.hash(snapshot, state, indices, shards);
return Objects.hash(uuid, snapshot, state, indices, shards);
}
}

Expand Down Expand Up @@ -393,8 +431,15 @@ public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException
}

public RestoreInProgress(StreamInput in) throws IOException {
Entry[] entries = new Entry[in.readVInt()];
for (int i = 0; i < entries.length; i++) {
int count = in.readVInt();
final ImmutableOpenMap.Builder<String, Entry> entriesBuilder = ImmutableOpenMap.builder(count);
for (int i = 0; i < count; i++) {
final String uuid;
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
uuid = in.readString();
} else {
uuid = BWC_UUID;
}
Snapshot snapshot = new Snapshot(in);
State state = State.fromValue(in.readByte());
int indices = in.readVInt();
Expand All @@ -409,9 +454,9 @@ public RestoreInProgress(StreamInput in) throws IOException {
ShardRestoreStatus shardState = ShardRestoreStatus.readShardRestoreStatus(in);
builder.put(shardId, shardState);
}
entries[i] = new Entry(snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build());
entriesBuilder.put(uuid, new Entry(uuid, snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build()));
}
this.entries = Arrays.asList(entries);
this.entries = entriesBuilder.build();
}

/**
Expand All @@ -420,7 +465,11 @@ public RestoreInProgress(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(entries.size());
for (Entry entry : entries) {
for (ObjectCursor<Entry> v : entries.values()) {
Entry entry = v.value;
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeString(entry.uuid);
}
entry.snapshot().writeTo(out);
out.writeByte(entry.state().value());
out.writeVInt(entry.indices().size());
Expand All @@ -441,8 +490,8 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray("snapshots");
for (Entry entry : entries) {
toXContent(entry, builder, params);
for (ObjectCursor<Entry> entry : entries.values()) {
toXContent(entry.value, builder, params);
}
builder.endArray();
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -208,22 +209,33 @@ public String toString() {
* recovery from a snapshot
*/
public static class SnapshotRecoverySource extends RecoverySource {
private final String restoreUUID;
private final Snapshot snapshot;
private final String index;
private final Version version;

public SnapshotRecoverySource(Snapshot snapshot, Version version, String index) {
public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, String index) {
this.restoreUUID = restoreUUID;
this.snapshot = Objects.requireNonNull(snapshot);
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(index);
}

SnapshotRecoverySource(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
restoreUUID = in.readString();
} else {
restoreUUID = RestoreInProgress.BWC_UUID;
}
snapshot = new Snapshot(in);
version = Version.readVersion(in);
index = in.readString();
}

public String restoreUUID() {
return restoreUUID;
}

public Snapshot snapshot() {
return snapshot;
}
Expand All @@ -238,6 +250,9 @@ public Version version() {

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeString(restoreUUID);
}
snapshot.writeTo(out);
Version.writeVersion(version, out);
out.writeString(index);
Expand All @@ -253,12 +268,13 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
builder.field("repository", snapshot.getRepository())
.field("snapshot", snapshot.getSnapshotId().getName())
.field("version", version.toString())
.field("index", index);
.field("index", index)
.field("restoreUUID", restoreUUID);
}

@Override
public String toString() {
return "snapshot recovery from " + snapshot.toString();
return "snapshot recovery [" + restoreUUID + "] from " + snapshot;
}

@Override
Expand All @@ -271,12 +287,13 @@ public boolean equals(Object o) {
}

@SuppressWarnings("unchecked") SnapshotRecoverySource that = (SnapshotRecoverySource) o;
return snapshot.equals(that.snapshot) && index.equals(that.index) && version.equals(that.version);
return restoreUUID.equals(that.restoreUUID) && snapshot.equals(that.snapshot)
&& index.equals(that.index) && version.equals(that.version);
}

@Override
public int hashCode() {
return Objects.hash(snapshot, index, version);
return Objects.hash(restoreUUID, snapshot, index, version);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.snapshots.Snapshot;

/**
* This {@link AllocationDecider} prevents shards that have failed to be
Expand All @@ -46,25 +45,24 @@ public Decision canAllocate(final ShardRouting shardRouting, final RoutingAlloca
return allocation.decision(Decision.YES, NAME, "ignored as shard is not being recovered from a snapshot");
}

final Snapshot snapshot = ((RecoverySource.SnapshotRecoverySource) recoverySource).snapshot();
RecoverySource.SnapshotRecoverySource source = (RecoverySource.SnapshotRecoverySource) recoverySource;
final RestoreInProgress restoresInProgress = allocation.custom(RestoreInProgress.TYPE);

if (restoresInProgress != null) {
for (RestoreInProgress.Entry restoreInProgress : restoresInProgress.entries()) {
if (restoreInProgress.snapshot().equals(snapshot)) {
RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId());
if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) {
assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting
+ "] to be in initializing state but got [" + shardRestoreStatus.state() + "]";
return allocation.decision(Decision.YES, NAME, "shard is currently being restored");
}
break;
RestoreInProgress.Entry restoreInProgress = restoresInProgress.get(source.restoreUUID());
if (restoreInProgress != null) {
RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId());
if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) {
assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting
+ "] to be in initializing state but got [" + shardRestoreStatus.state() + "]";
return allocation.decision(Decision.YES, NAME, "shard is currently being restored");
}
}
}
return allocation.decision(Decision.NO, NAME, "shard has failed to be restored from the snapshot [%s] because of [%s] - " +
"manually close or delete the index [%s] in order to retry to restore the snapshot again or use the reroute API to force the " +
"allocation of an empty primary shard", snapshot, shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName());
"allocation of an empty primary shard",
source.snapshot(), shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName());
}

@Override
Expand Down
Loading