Skip to content

Commit

Permalink
Recovery with syncId should verify seqno infos (#41265)
Browse files Browse the repository at this point in the history
This change verifies and aborts recovery if source and target have the
same syncId but different sequenceId. This commit also adds an upgrade
test to ensure that we always utilize syncId.
  • Loading branch information
dnhatn committed May 24, 2019
1 parent 9772574 commit 5b0b98b
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static ClusterType parse(String value) {
}

protected static final ClusterType CLUSTER_TYPE = ClusterType.parse(System.getProperty("tests.rest.suite"));
protected static final boolean firstMixedRound = Boolean.parseBoolean(System.getProperty("tests.first_round", "false"));

@Override
protected final boolean preserveIndicesUponCompletion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.hamcrest.Matcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
Expand Down Expand Up @@ -172,6 +174,25 @@ public void testRecoveryWithConcurrentIndexing() throws Exception {
}
}

private void assertDocCountOnAllCopies(String index, int expectedCount) throws Exception {
assertBusy(() -> {
Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));
String xpath = "routing_table.indices." + index + ".shards.0.node";
@SuppressWarnings("unchecked") List<String> assignedNodes = (List<String>) XContentMapValues.extractValue(xpath, state);
assertNotNull(state.toString(), assignedNodes);
for (String assignedNode : assignedNodes) {
try {
assertCount(index, "_only_nodes:" + assignedNode, expectedCount);
} catch (ResponseException e) {
if (e.getMessage().contains("no data nodes with criteria [" + assignedNode + "found for shard: [" + index + "][0]")) {
throw new AssertionError(e); // shard is relocating - ask assert busy to retry
}
throw e;
}
}
});
}

private void assertCount(final String index, final String preference, final int expectedCount) throws IOException {
final int actualDocs;
try {
Expand Down Expand Up @@ -275,34 +296,52 @@ public void testRelocationWithConcurrentIndexing() throws Exception {
}
}

/**
* This test ensures that peer recovery won't get stuck in a situation where the recovery target and recovery source
* have an identical sync id but different local checkpoint in the commit in particular the target does not have
* sequence numbers yet. This is possible if the primary is on 6.x while the replica was on 5.x and some write
* operations with sequence numbers have taken place. If this is not the case, then peer recovery should utilize
* syncId and skip copying files.
*/
public void testRecoverSyncedFlushIndex() throws Exception {
final String index = "recover_synced_flush_index";
if (CLUSTER_TYPE == ClusterType.OLD) {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
// if the node with the replica is the first to be restarted, while a replica is still recovering
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
// before timing out
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0"); // fail faster
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2);
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")
.put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), "256b");
}
createIndex(index, settings.build());
indexDocs(index, 0, randomInt(5));
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
assertBusy(() -> {
try {
Response resp = client().performRequest(new Request("POST", index + "/_flush/synced"));
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
assertThat(result.get("successful"), equalTo(result.get("total")));
assertThat(result.get("failed"), equalTo(0));
} catch (ResponseException ex) {
throw new AssertionError(ex); // cause assert busy to retry
ensureGreen(index);
indexDocs(index, 0, 40);
syncedFlush(index);
} else if (CLUSTER_TYPE == ClusterType.MIXED) {
ensureGreen(index);
if (firstMixedRound) {
assertPeerRecoveredFiles("peer recovery with syncId should not copy files", index, "upgraded-node-0", equalTo(0));
assertDocCountOnAllCopies(index, 40);
indexDocs(index, 40, 50);
syncedFlush(index);
} else {
assertPeerRecoveredFiles("peer recovery with syncId should not copy files", index, "upgraded-node-1", equalTo(0));
assertDocCountOnAllCopies(index, 90);
indexDocs(index, 90, 60);
syncedFlush(index);
// exclude node-2 from allocation-filter so we can trim translog on the primary before node-2 starts recover
if (randomBoolean()) {
updateIndexSettings(index, Settings.builder().put("index.routing.allocation.include._name", "upgraded-*"));
}
});
}
} else {
final int docsAfterUpgraded = randomIntBetween(0, 100);
indexDocs(index, 150, docsAfterUpgraded);
ensureGreen(index);
assertPeerRecoveredFiles("peer recovery with syncId should not copy files", index, "upgraded-node-2", equalTo(0));
assertDocCountOnAllCopies(index, 150 + docsAfterUpgraded);
}
ensureGreen(index);
}

public void testRecoveryWithSoftDeletes() throws Exception {
Expand Down Expand Up @@ -480,4 +519,52 @@ private void assertClosedIndex(final String index, final boolean checkRoutingTab
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
}
}

private void syncedFlush(String index) throws Exception {
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
assertBusy(() -> {
try {
Response resp = client().performRequest(new Request("POST", index + "/_flush/synced"));
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
assertThat(result.get("failed"), equalTo(0));
} catch (ResponseException ex) {
throw new AssertionError(ex); // cause assert busy to retry
}
});
// ensure the global checkpoint is synced; otherwise we might trim the commit with syncId
ensureGlobalCheckpointSynced(index);
}

@SuppressWarnings("unchecked")
private void assertPeerRecoveredFiles(String reason, String index, String targetNode, Matcher<Integer> sizeMatcher) throws IOException {
Map<?, ?> recoveryStats = entityAsMap(client().performRequest(new Request("GET", index + "/_recovery")));
List<Map<?, ?>> shards = (List<Map<?, ?>>) XContentMapValues.extractValue(index + "." + "shards", recoveryStats);
for (Map<?, ?> shard : shards) {
if (Objects.equals(XContentMapValues.extractValue("type", shard), "PEER")) {
if (Objects.equals(XContentMapValues.extractValue("target.name", shard), targetNode)) {
Integer recoveredFileSize = (Integer) XContentMapValues.extractValue("index.files.recovered", shard);
assertThat(reason + " target node [" + targetNode + "] stats [" + recoveryStats + "]", recoveredFileSize, sizeMatcher);
}
}
}
}

@SuppressWarnings("unchecked")
private void ensureGlobalCheckpointSynced(String index) throws Exception {
assertBusy(() -> {
Map<?, ?> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
List<Map<?, ?>> shardStats = (List<Map<?, ?>>) XContentMapValues.extractValue("indices." + index + ".shards.0", stats);
shardStats.stream()
.map(shard -> (Map<?, ?>) XContentMapValues.extractValue("seq_no", shard))
.filter(Objects::nonNull)
.forEach(seqNoStat -> {
long globalCheckpoint = ((Number) XContentMapValues.extractValue("global_checkpoint", seqNoStat)).longValue();
long localCheckpoint = ((Number) XContentMapValues.extractValue("local_checkpoint", seqNoStat)).longValue();
long maxSeqNo = ((Number) XContentMapValues.extractValue("max_seq_no", seqNoStat)).longValue();
assertThat(shardStats.toString(), localCheckpoint, equalTo(maxSeqNo));
assertThat(shardStats.toString(), globalCheckpoint, equalTo(maxSeqNo));
});
}, 60, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,25 +359,10 @@ public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckp
recoverySourceMetadata.asMap().size() + " files", name);
}
}
// Generate a "diff" of all the identical, different, and missing
// segment files on the target node, using the existing files on
// the source node
String recoverySourceSyncId = recoverySourceMetadata.getSyncId();
String recoveryTargetSyncId = request.metadataSnapshot().getSyncId();
final boolean recoverWithSyncId = recoverySourceSyncId != null &&
recoverySourceSyncId.equals(recoveryTargetSyncId);
if (recoverWithSyncId) {
final long numDocsTarget = request.metadataSnapshot().getNumDocs();
final long numDocsSource = recoverySourceMetadata.getNumDocs();
if (numDocsTarget != numDocsSource) {
throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number " +
"of docs differ: " + numDocsSource + " (" + request.sourceNode().getName() + ", primary) vs " + numDocsTarget
+ "(" + request.targetNode().getName() + ")");
}
// we shortcut recovery here because we have nothing to copy. but we must still start the engine on the target.
// so we don't return here
logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target", recoverySourceSyncId);
} else {
if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) {
// Generate a "diff" of all the identical, different, and missing
// segment files on the target node, using the existing files on
// the source node
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());
for (StoreFileMetaData md : diff.identical) {
phase1ExistingFileNames.add(md.name());
Expand Down Expand Up @@ -458,6 +443,9 @@ public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckp
throw targetException;
}
}
} else {
logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target",
recoverySourceMetadata.getSyncId());
}
final TimeValue took = stopWatch.totalTime();
logger.trace("recovery [phase1]: took [{}]", took);
Expand All @@ -470,6 +458,26 @@ public SendFileResult phase1(final IndexCommit snapshot, final long globalCheckp
}
}

boolean canSkipPhase1(Store.MetadataSnapshot source, Store.MetadataSnapshot target) {
if (source.getSyncId() == null || source.getSyncId().equals(target.getSyncId()) == false) {
return false;
}
if (source.getNumDocs() != target.getNumDocs()) {
throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number " +
"of docs differ: " + source.getNumDocs() + " (" + request.sourceNode().getName() + ", primary) vs " + target.getNumDocs()
+ "(" + request.targetNode().getName() + ")");
}
SequenceNumbers.CommitInfo sourceSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(source.getCommitUserData().entrySet());
SequenceNumbers.CommitInfo targetSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(target.getCommitUserData().entrySet());
if (sourceSeqNos.localCheckpoint != targetSeqNos.localCheckpoint || targetSeqNos.maxSeqNo != sourceSeqNos.maxSeqNo) {
final String message = "try to recover " + request.shardId() + " with sync id but " +
"seq_no stats are mismatched: [" + source.getCommitUserData() + "] vs [" + target.getCommitUserData() + "]";
assert false : message;
throw new IllegalStateException(message);
}
return true;
}

void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<TimeValue> listener) {
StopWatch stopWatch = new StopWatch().start();
final ActionListener<Void> wrappedListener = ActionListener.wrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -649,6 +651,43 @@ public void writeFileChunk(StoreFileMetaData md, long position, BytesReference c
store.close();
}

public void testVerifySeqNoStatsWhenRecoverWithSyncId() throws Exception {
IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
RecoverySourceHandler handler = new RecoverySourceHandler(
shard, new TestRecoveryTargetHandler(), getStartRecoveryRequest(), between(1, 16), between(1, 4));

String syncId = UUIDs.randomBase64UUID();
int numDocs = between(0, 1000);
long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
assertTrue(handler.canSkipPhase1(
newMetadataSnapshot(syncId, Long.toString(localCheckpoint), Long.toString(maxSeqNo), numDocs),
newMetadataSnapshot(syncId, Long.toString(localCheckpoint), Long.toString(maxSeqNo), numDocs)));

AssertionError error = expectThrows(AssertionError.class, () -> {
long localCheckpointOnTarget = randomValueOtherThan(localCheckpoint,
() -> randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE));
long maxSeqNoOnTarget = randomValueOtherThan(maxSeqNo,
() -> randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE));
handler.canSkipPhase1(newMetadataSnapshot(syncId, Long.toString(localCheckpoint), Long.toString(maxSeqNo), numDocs),
newMetadataSnapshot(syncId, Long.toString(localCheckpointOnTarget), Long.toString(maxSeqNoOnTarget), numDocs));
});
assertThat(error.getMessage(), containsString("try to recover [index][1] with sync id but seq_no stats are mismatched:"));
}

private Store.MetadataSnapshot newMetadataSnapshot(String syncId, String localCheckpoint, String maxSeqNo, int numDocs) {
Map<String, String> userData = new HashMap<>();
userData.put(Engine.SYNC_COMMIT_ID, syncId);
if (localCheckpoint != null) {
userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
}
if (maxSeqNo != null) {
userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, maxSeqNo);
}
return new Store.MetadataSnapshot(Collections.emptyMap(), userData, numDocs);
}

private Store newStore(Path path) throws IOException {
return newStore(path, true);
}
Expand Down

0 comments on commit 5b0b98b

Please sign in to comment.