Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promote replica on the highest version node #25277

Merged
merged 11 commits into from
Jun 29, 2017

Conversation

dakrone
Copy link
Member

@dakrone dakrone commented Jun 16, 2017

This changes the replica selection to prefer to return replicas on the highest
version when choosing a replacement to promote when the primary shard fails.

Consider this situation:

  • A replica on a 5.6 node
  • Another replica on a 6.0 node
  • The primary on a 6.0 node

The primary shard is sending sequence numbers to the replica on the 6.0 node and
skipping sending them for the 5.6 node. Now assume that the primary shard fails
and (prior to this change) the replica on 5.6 node gets promoted to primary, it
now has no knowledge of sequence numbers and the replica on the 6.0 node will be
expecting sequence numbers but will never receive them.

Relates to #10708

This changes the replica selection to prefer to return replicas on the highest
version when choosing a replacement to promote when the primary shard fails.

Consider this situation:

- A replica on a 5.6 node
- Another replica on a 6.0 node
- The primary on a 6.0 node

The primary shard is sending sequence numbers to the replica on the 6.0 node and
skipping sending them for the 5.6 node. Now assume that the primary shard fails
and (prior to this change) the replica on 5.6 node gets promoted to primary, it
now has no knowledge of sequence numbers and the replica on the 6.0 node will be
expecting sequence numbers but will never receive them.

Relates to elastic#10708
@dakrone
Copy link
Member Author

dakrone commented Jun 19, 2017

@jasontedor could you take a look at this please?

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left some comments and suggestions. This also needs to go into 5.6 as a 5.6 master in a mixed 5.6/6.x cluster should be running this code as well.

// calls this method with an out-of-date RoutingNodes, where the version might not
// be accessible. Therefore, we need to protect against the version being null
// (meaning the node will be going away).
Version replicaNodeVersion = nodesToVersions.get(shardRouting.currentNodeId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nodesToVersions map is not needed. You can get the version using node(shardRouting.currentNodeId()).node().getVersion().
Not having this extra nodesToVersions map also solves consistency issues where entries are removed from nodesToShards but not nodesToVersions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh thanks, I didn't know about that, I removed the map.

if (replicaNodeVersion == null && candidate == null) {
// Only use this replica if there are no other candidates
candidate = shardRouting;
} else if (highestVersionSeen == null || (replicaNodeVersion != null && replicaNodeVersion.after(highestVersionSeen))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method looks like it could enjoy Java 8 lambdas, for example something along the lines of:

return assignedShards(shardId).stream()
    .filter(shr -> !shr.primary() && shr.active())
    .max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(),
        Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))))
    .orElse(null);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutters something about Java pretending to be a functional language

I don't agree with the word "enjoy" (I think the lambda version is messier than the non-lambda version since there's no Monads in Java) but I did this because you asked for it.

// add a single node
clusterState = ClusterState.builder(clusterState).nodes(
DiscoveryNodes.builder()
.add(newNode("node1-5.x", Version.V_5_6_0)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you generalize the test to use two arbitrary (but distinct) versions? i.e. VersionUtils.randomVersion()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No? Currently the only situation that is valid for a mixed-major-version cluster is 5.6 and 6.0, we don't support mixed clusters of any other versions and 5.6.1 isn't out yet. I'm not sure how randomization would help here, other than triggering some other version-related failures :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR does more than just an ordering on 5.6/6.x. It also orders among 6.0 and 6.1 nodes, which is left untested here. Either we restrict the "Promote replica on the highest version node" logic to only order 6.x nodes before 5.6 (and leave 6.0 and 6.1 unordered) or we test that this logic also properly orders 6.0 and 6.1. I agree there is no need to test 5.1 and 6.2.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I randomized the versions

ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId);
logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica);

// fail the primary shard, check replicas get removed as well...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replicas should not be removed? Looks like copy pasta from another test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I fixed this, thanks

@@ -556,4 +558,118 @@ public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToEle
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
}

public void testReplicaOnNewestVersionIsPromoted() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test checks one specific scenario. I think that it can be easily generalized in the way of the IndicesClusterStateServiceRandomUpdatesTests so that it simulates a large range of scenarios.
Essentially it would boil down to creating a few nodes with random version (see randomInitialClusterState of IndicesClusterStateServiceRandomUpdatesTests), allocating a few shards to the nodes (see ClusterStateChanges.createIndex), then failing some of the shards (incl. primary), see ClusterStateChanges.applyFailedShards or failing some of the nodes (incl. primary), see ClusterStateChanges.deassociateDeadNodes and then checking that the new primary is on the node with highest version.

@dakrone
Copy link
Member Author

dakrone commented Jun 22, 2017

@ywelsch I added a test similar to what we talked about, as well as addressing your other feedback, please take another look!

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few more comments about the tests.

}

logger.info("--> starting shards");
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra semicolon


logger.info("--> starting shards");
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));;
state = cluster.reroute(state, new ClusterRerouteRequest());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reroute happens as part of applyStartedShards in the line above

state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));;
state = cluster.reroute(state, new ClusterRerouteRequest());
logger.info("--> starting replicas");
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no guarantee that all replicas are started (as we have throttling). It's good to test the situation where not all replicas are started though, so maybe we can call applyStartedShards a random number of times.

for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) {
if (shardRouting.primary() && randomBoolean()) {
ShardRouting replicaToBePromoted = state.getRoutingNodes()
.activeReplicaWithHighestVersion(shardRouting.shardId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're testing the method activeReplicaWithHighestVersion here using the method itself? I see no checks here that the primary is indeed on the node with the highest version. I think for the purpose of the test it is sufficient to check that

  1. if there was at least one active replica while the primary was failed, that a new active primary got assigned
  2. That the new active primary is on a node with higher or equal version than the replicas.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the test to verify candidates using without the activeReplicaWithHighestVersion method

Settings.Builder settingsBuilder = Settings.builder()
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3))
.put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 3))
.put("index.routing.allocation.total_shards_per_node", 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this :)

ClusterState previousState = state;
// apply cluster state to nodes (incl. master)
for (DiscoveryNode node : state.nodes()) {
IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does not require IndicesClusterStateService, only the ClusterStateChanges class. All the code in this block can go away, it does not add anything to the test.
The test can be put into FailedShardsRoutingTests.

Copy link
Member Author

@dakrone dakrone Jun 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does use the randomInitialClusterState method, which I'm not sure we want to duplicate, is it worth coupling the tests just to put it in the other location? (edit: I misread and thought two methods were used, only one is)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

randomInitialClusterState is 5 lines. I think we can duplicate that :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I moved it.

List<FailedShard> shardsToFail = new ArrayList<>();
logger.info("--> found replica that should be promoted: {}", replicaToBePromoted);
logger.info("--> failing shard {}", shardRouting);
shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're testing only one failure at a time.
Instead, the test could select a subset of the primary shards at random (and also a few replica shards) and fail them in one go.

.filter(shr -> !shr.primary() && shr.active())
.filter(shr -> node(shr.currentNodeId()) != null)
.max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(),
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you readd the comment why we need to consider "null" here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-added this comment

logger.info("--> found replica that should be promoted: {}", replicaToBePromoted);
logger.info("--> failing shard {}", shardRouting);
shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception()));
state = cluster.applyFailedShards(state, shardsToFail);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an alternative to explicit shard failing is to remove nodes where the shards are allocated (i.e. when a node disconnects from the cluster).
This would also test the scenario where DiscoveryNode is null in the RoutingNode.

@@ -24,6 +24,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import?

@dakrone
Copy link
Member Author

dakrone commented Jun 27, 2017

@ywelsch I pushed a few commits addressing your feedback, thanks again for taking a look at this.

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few more minor comments. Address as you see fit.

DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes())
.add(createNode()).build();
state = ClusterState.builder(state).nodes(newNodes).build();
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is stale (there are no nodes removed here)

state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave
}

// Log the shard versions (for debugging if necessary)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log the node versions?
Can also be done directly in the loop where you are adding the nodes :-)

state = cluster.createIndex(state, request);
assertTrue(state.metaData().hasIndex(name));
}
state = cluster.reroute(state, new ClusterRerouteRequest());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed. createIndex automatically reroutes.

state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));
}

logger.info("--> state before failing shards: {}", state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

private static Version getNodeVersion(ShardRouting shardRouting, ClusterState state) {
for (ObjectObjectCursor<String, DiscoveryNode> entry : state.getNodes().getDataNodes()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for iteration here, you can get the node directly by calling state.getNodes().get(shardRouting.currentNodeId()) (which will return null if no node found)

.filter(s -> currentState.getRoutingNodes().node(s.currentNodeId()) != null)
.collect(Collectors.toSet());
// If we find a replica and at least another candidate
if (replicaToBePromoted != null && candidates.size() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to determine replicaToBePromoted. Candidates also does not need to be filtered with !s.equals(replicaToBePromoted). It's ok to just check candidates.size() > 0 here to see whether there is going to be a new primary. In that case, we fail the primary + random(0, candidates.size() - 1) replicas and check afterwards that the new primary is on a node that is at least as high as all replicas.

@dakrone
Copy link
Member Author

dakrone commented Jun 28, 2017

Thanks @ywelsch, I rewrote the check to be more in line with what we discussed.

@dakrone dakrone merged commit 22ff76d into elastic:master Jun 29, 2017
dakrone added a commit that referenced this pull request Jun 29, 2017
* Promote replica on the highest version node

This changes the replica selection to prefer to return replicas on the highest
version when choosing a replacement to promote when the primary shard fails.

Consider this situation:

- A replica on a 5.6 node
- Another replica on a 6.0 node
- The primary on a 6.0 node

The primary shard is sending sequence numbers to the replica on the 6.0 node and
skipping sending them for the 5.6 node. Now assume that the primary shard fails
and (prior to this change) the replica on 5.6 node gets promoted to primary, it
now has no knowledge of sequence numbers and the replica on the 6.0 node will be
expecting sequence numbers but will never receive them.

Relates to #10708

* Switch from map of node to version to retrieving the version from the node

* Remove uneeded null check

* You can pretend you're a functional language Java, but you're not fooling me.

* Randomize node versions

* Add test with random cluster state with multiple versions that fails shards

* Re-add comment and remove extra import

* Remove unneeded stuff, randomly start replicas a few more times

* Move test into FailedNodeRoutingTests

* Make assertions actually test replica version promotion

* Rewrite test, taking Yannick's feedback into account
jasontedor added a commit that referenced this pull request Jun 30, 2017
* master: (129 commits)
  Add doc note regarding explicit publish host
  Fix typo in name of test
  Add additional test for sequence-number recovery
  WrapperQueryBuilder should also rewrite the parsed query.
  Remove dead code and stale Javadoc
  Update defaults in documentation (#25483)
  [DOCS] Add docs-dir to Painless (#25482)
  Add concurrent deprecation logger test
  [DOCS] Update shared attributes for Elasticsearch (#25479)
  Use LRU set to reduce repeat deprecation messages
  Add NioTransport threads to thread name checks (#25477)
  Add shortcut for AbstractQueryBuilder.parseInnerQueryBuilder to QueryShardContext
  Prevent channel enqueue after selector close (#25478)
  Fix Java 9 compilation issue
  Remove unregistered `transport.netty.*` settings (#25476)
  Handle ping correctly in NioTransport (#25462)
  Tests: Remove platform specific assertion in NioSocketChannelTests
  Remove QueryParseContext from parsing QueryBuilders (#25448)
  Promote replica on the highest version node (#25277)
  test: added not null assertion
  ...
@lcawl lcawl added :Distributed/Distributed A catch all label for anything in the Distributed Area. If you aren't sure, use this one. and removed :Allocation labels Feb 13, 2018
@clintongormley clintongormley added :Distributed/Engine Anything around managing Lucene and the Translog in an open shard. and removed :Sequence IDs labels Feb 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
blocker :Distributed/Distributed A catch all label for anything in the Distributed Area. If you aren't sure, use this one. :Distributed/Engine Anything around managing Lucene and the Translog in an open shard. >enhancement v5.6.0 v6.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants