From 3aa3edc647187699e12019b6008c2cbfa30c85fd Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 15 Jun 2017 16:26:33 -0600 Subject: [PATCH 01/11] Promote replica on the highest version node This changes the replica selection to prefer to return replicas on the highest version when choosing a replacement to promote when the primary shard fails. Consider this situation: - A replica on a 5.6 node - Another replica on a 6.0 node - The primary on a 6.0 node The primary shard is sending sequence numbers to the replica on the 6.0 node and skipping sending them for the 5.6 node. Now assume that the primary shard fails and (prior to this change) the replica on 5.6 node gets promoted to primary, it now has no knowledge of sequence numbers and the replica on the 6.0 node will be expecting sequence numbers but will never receive them. Relates to #10708 --- .../cluster/routing/RoutingNodes.java | 32 ++++- .../allocation/FailedShardsRoutingTests.java | 118 +++++++++++++++++- 2 files changed, 144 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index e93d071b0cf33..f2b0a39cf95c3 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.Assertions; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -67,6 +68,8 @@ public class RoutingNodes implements Iterable { private final Map nodesToShards = new HashMap<>(); + private final Map nodesToVersions = new HashMap<>(); + private final UnassignedShards unassignedShards = new UnassignedShards(this); private final Map> assignedShards = new HashMap<>(); @@ -94,6 +97,7 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { // fill in the nodeToShards with the "live" nodes for (ObjectCursor cursor : clusterState.nodes().getDataNodes().values()) { nodesToShards.put(cursor.value.getId(), new LinkedHashMap<>()); // LinkedHashMap to preserve order + nodesToVersions.put(cursor.value.getId(), cursor.value.getVersion()); } // fill in the inverse of node -> shards allocated @@ -320,14 +324,32 @@ public ShardRouting activePrimary(ShardId shardId) { /** * Returns one active replica shard for the given shard id or null if * no active replica is found. + * + * Since replicas could possibly be on nodes with a older version of ES than + * the primary is, this will return replicas on the highest version of ES. + * */ - public ShardRouting activeReplica(ShardId shardId) { + public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { + Version highestVersionSeen = null; + ShardRouting candidate = null; for (ShardRouting shardRouting : assignedShards(shardId)) { if (!shardRouting.primary() && shardRouting.active()) { - return shardRouting; + // It's possible for replicaNodeVersion to be null, when deassociating dead nodes + // that have been removed, the shards are failed, and part of the shard failing + // calls this method with an out-of-date RoutingNodes, where the version might not + // be accessible. Therefore, we need to protect against the version being null + // (meaning the node will be going away). + Version replicaNodeVersion = nodesToVersions.get(shardRouting.currentNodeId()); + if (replicaNodeVersion == null && candidate == null) { + // Only use this replica if there are no other candidates + candidate = shardRouting; + } else if (highestVersionSeen == null || (replicaNodeVersion != null && replicaNodeVersion.after(highestVersionSeen))) { + highestVersionSeen = replicaNodeVersion; + candidate = shardRouting; + } } } - return null; + return candidate; } /** @@ -567,7 +589,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId if (failedShard.relocatingNodeId() == null) { if (failedShard.primary()) { // promote active replica to primary if active replica exists (only the case for shadow replicas) - ShardRouting activeReplica = activeReplica(failedShard.shardId()); + ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); if (activeReplica == null) { moveToUnassigned(failedShard, unassignedInfo); } else { @@ -596,7 +618,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId assert failedShard.active(); if (failedShard.primary()) { // promote active replica to primary if active replica exists - ShardRouting activeReplica = activeReplica(failedShard.shardId()); + ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); if (activeReplica == null) { moveToUnassigned(failedShard, unassignedInfo); } else { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 6063faba156ff..9703de2385ec5 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -19,12 +19,14 @@ package org.elasticsearch.cluster.routing.allocation; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; @@ -499,7 +501,7 @@ public void testFailAllReplicasInitializingOnPrimaryFail() { Collections.singletonList(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0))); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); - ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplica(shardId); + ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); // fail the primary shard, check replicas get removed as well... @@ -556,4 +558,118 @@ public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToEle ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); } + + public void testReplicaOnNewestVersionIsPromoted() { + AllocationService allocation = createAllocationService(Settings.builder().build()); + + MetaData metaData = MetaData.builder().put(IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3)) .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData).routingTable(initialRoutingTable).build(); + + ShardId shardId = new ShardId(metaData.index("test").getIndex(), 0); + + // add a single node + clusterState = ClusterState.builder(clusterState).nodes( + DiscoveryNodes.builder() + .add(newNode("node1-5.x", Version.V_5_6_0))) + .build(); + clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build(); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3)); + + // start primary shard + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3)); + + // add another 5.6 node + clusterState = ClusterState.builder(clusterState).nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node2-5.x", Version.V_5_6_0))) + .build(); + + // start the shards, should have 1 primary and 1 replica available + clusterState = allocation.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); + + clusterState = ClusterState.builder(clusterState).nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node3-6.x", Version.V_6_0_0_alpha3)) + .add(newNode("node4-6.x", Version.V_6_0_0_alpha3))) + .build(); + + // start all the replicas + clusterState = allocation.reroute(clusterState, "reroute"); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); + assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); + + ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica); + + // fail the primary shard, check replicas get removed as well... + ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); + ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); + assertThat(newState, not(equalTo(clusterState))); + clusterState = newState; + // the primary gets allocated on another node + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3)); + + ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); + assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId())); + + Version replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion(); + assertNotNull(replicaNodeVersion); + logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion); + + for (ObjectCursor cursor : clusterState.nodes().getDataNodes().values()) { + if ("node1".equals(cursor.value.getId())) { + // Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check + continue; + } + Version nodeVer = cursor.value.getVersion(); + assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, + replicaNodeVersion.onOrAfter(nodeVer)); + } + + startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); + logger.info("--> failing primary shard a second time, should select: {}", startedReplica); + + // fail the primary shard again, and ensure the same thing happens + primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); + newState = allocation.applyFailedShard(clusterState, primaryShardToFail); + assertThat(newState, not(equalTo(clusterState))); + clusterState = newState; + // the primary gets allocated on another node + assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); + + newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); + assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); + assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId())); + + replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion(); + assertNotNull(replicaNodeVersion); + logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion); + + for (ObjectCursor cursor : clusterState.nodes().getDataNodes().values()) { + if ("node1".equals(cursor.value.getId())) { + // Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check + continue; + } + Version nodeVer = cursor.value.getVersion(); + assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, + replicaNodeVersion.onOrAfter(nodeVer)); + } + } } From 1273e999a58db6628af8894a2c5e4440d83d3ccd Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 20 Jun 2017 10:27:12 -0600 Subject: [PATCH 02/11] Switch from map of node to version to retrieving the version from the node --- .../cluster/routing/RoutingNodes.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index f2b0a39cf95c3..a73d2c4d5c540 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -68,8 +68,6 @@ public class RoutingNodes implements Iterable { private final Map nodesToShards = new HashMap<>(); - private final Map nodesToVersions = new HashMap<>(); - private final UnassignedShards unassignedShards = new UnassignedShards(this); private final Map> assignedShards = new HashMap<>(); @@ -97,7 +95,6 @@ public RoutingNodes(ClusterState clusterState, boolean readOnly) { // fill in the nodeToShards with the "live" nodes for (ObjectCursor cursor : clusterState.nodes().getDataNodes().values()) { nodesToShards.put(cursor.value.getId(), new LinkedHashMap<>()); // LinkedHashMap to preserve order - nodesToVersions.put(cursor.value.getId(), cursor.value.getVersion()); } // fill in the inverse of node -> shards allocated @@ -339,13 +336,16 @@ public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { // calls this method with an out-of-date RoutingNodes, where the version might not // be accessible. Therefore, we need to protect against the version being null // (meaning the node will be going away). - Version replicaNodeVersion = nodesToVersions.get(shardRouting.currentNodeId()); - if (replicaNodeVersion == null && candidate == null) { - // Only use this replica if there are no other candidates - candidate = shardRouting; - } else if (highestVersionSeen == null || (replicaNodeVersion != null && replicaNodeVersion.after(highestVersionSeen))) { - highestVersionSeen = replicaNodeVersion; - candidate = shardRouting; + RoutingNode replicaNode = node(shardRouting.currentNodeId()); + if (replicaNode != null && replicaNode.node() != null) { + Version replicaNodeVersion = replicaNode.node().getVersion(); + if (replicaNodeVersion == null && candidate == null) { + // Only use this replica if there are no other candidates + candidate = shardRouting; + } else if (highestVersionSeen == null || (replicaNodeVersion != null && replicaNodeVersion.after(highestVersionSeen))) { + highestVersionSeen = replicaNodeVersion; + candidate = shardRouting; + } } } } From 879eef00dd73f7a6ba09d0d2cd4bb774755f6a20 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 20 Jun 2017 10:38:37 -0600 Subject: [PATCH 03/11] Remove uneeded null check --- .../org/elasticsearch/cluster/routing/RoutingNodes.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index a73d2c4d5c540..1f1d2217ae527 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -339,12 +339,12 @@ public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { RoutingNode replicaNode = node(shardRouting.currentNodeId()); if (replicaNode != null && replicaNode.node() != null) { Version replicaNodeVersion = replicaNode.node().getVersion(); - if (replicaNodeVersion == null && candidate == null) { - // Only use this replica if there are no other candidates - candidate = shardRouting; - } else if (highestVersionSeen == null || (replicaNodeVersion != null && replicaNodeVersion.after(highestVersionSeen))) { + if (highestVersionSeen == null || replicaNodeVersion.after(highestVersionSeen)) { highestVersionSeen = replicaNodeVersion; candidate = shardRouting; + } else if (candidate == null) { + // Only use this replica if there are no other candidates + candidate = shardRouting; } } } From cb42f65b5a87fba77c5287bc111c9e86810b044f Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 20 Jun 2017 10:58:02 -0600 Subject: [PATCH 04/11] You can pretend you're a functional language Java, but you're not fooling me. --- .../cluster/routing/RoutingNodes.java | 29 ++++--------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 1f1d2217ae527..45cb9188efc42 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -327,29 +327,12 @@ public ShardRouting activePrimary(ShardId shardId) { * */ public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { - Version highestVersionSeen = null; - ShardRouting candidate = null; - for (ShardRouting shardRouting : assignedShards(shardId)) { - if (!shardRouting.primary() && shardRouting.active()) { - // It's possible for replicaNodeVersion to be null, when deassociating dead nodes - // that have been removed, the shards are failed, and part of the shard failing - // calls this method with an out-of-date RoutingNodes, where the version might not - // be accessible. Therefore, we need to protect against the version being null - // (meaning the node will be going away). - RoutingNode replicaNode = node(shardRouting.currentNodeId()); - if (replicaNode != null && replicaNode.node() != null) { - Version replicaNodeVersion = replicaNode.node().getVersion(); - if (highestVersionSeen == null || replicaNodeVersion.after(highestVersionSeen)) { - highestVersionSeen = replicaNodeVersion; - candidate = shardRouting; - } else if (candidate == null) { - // Only use this replica if there are no other candidates - candidate = shardRouting; - } - } - } - } - return candidate; + return assignedShards(shardId).stream() + .filter(shr -> !shr.primary() && shr.active()) + .filter(shr -> node(shr.currentNodeId()) != null) + .max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(), + Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)))) + .orElse(null); } /** From 54edaf87f6513dfe97017ee0f7c3f8cdc2534490 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 20 Jun 2017 12:07:47 -0600 Subject: [PATCH 05/11] Randomize node versions --- .../allocation/FailedShardsRoutingTests.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 9703de2385ec5..2eedeba63f307 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.VersionUtils; import java.util.ArrayList; import java.util.Collections; @@ -602,8 +603,8 @@ public void testReplicaOnNewestVersionIsPromoted() { clusterState = ClusterState.builder(clusterState).nodes( DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node3-6.x", Version.V_6_0_0_alpha3)) - .add(newNode("node4-6.x", Version.V_6_0_0_alpha3))) + .add(newNode("node3-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null))) + .add(newNode("node4-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null)))) .build(); // start all the replicas @@ -617,7 +618,7 @@ public void testReplicaOnNewestVersionIsPromoted() { ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica); - // fail the primary shard, check replicas get removed as well... + // fail the primary shard again and make sure the correct replica is promoted ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); assertThat(newState, not(equalTo(clusterState))); @@ -647,15 +648,15 @@ public void testReplicaOnNewestVersionIsPromoted() { logger.info("--> failing primary shard a second time, should select: {}", startedReplica); // fail the primary shard again, and ensure the same thing happens - primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); - newState = allocation.applyFailedShard(clusterState, primaryShardToFail); + ShardRouting secondPrimaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); + newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail); assertThat(newState, not(equalTo(clusterState))); clusterState = newState; // the primary gets allocated on another node assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); - assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); + assertThat(newPrimaryShard, not(equalTo(secondPrimaryShardToFail))); assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId())); replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion(); @@ -663,7 +664,8 @@ public void testReplicaOnNewestVersionIsPromoted() { logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion); for (ObjectCursor cursor : clusterState.nodes().getDataNodes().values()) { - if ("node1".equals(cursor.value.getId())) { + if (primaryShardToFail.currentNodeId().equals(cursor.value.getId()) || + secondPrimaryShardToFail.currentNodeId().equals(cursor.value.getId())) { // Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check continue; } From ef8c79d92bc393b8c944f206110afa06f9c1990e Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 22 Jun 2017 11:51:48 -0600 Subject: [PATCH 06/11] Add test with random cluster state with multiple versions that fails shards --- ...ClusterStateServiceRandomUpdatesTests.java | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index adfc6609d8f93..925989daecdd8 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.cluster; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; @@ -50,6 +51,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -72,6 +74,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -139,6 +143,87 @@ public void testRandomClusterStateUpdates() { logger.info("Final cluster state: {}", state); } + public void testRandomClusterPromotesNewestReplica() { + // we have an IndicesClusterStateService per node in the cluster + final Map clusterStateServiceMap = new HashMap<>(); + ClusterState state = randomInitialClusterState(clusterStateServiceMap, MockIndicesService::new); + + // randomly add nodes of mixed versions + logger.info("--> adding random nodes"); + for (int i = 0; i < randomIntBetween(4, 8); i++) { + DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()) + .add(createRandomVersionNode()).build(); + state = ClusterState.builder(state).nodes(newNodes).build(); + state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave + updateNodes(state, clusterStateServiceMap, MockIndicesService::new); + } + + // Log the shard versions (for debugging if necessary) + for (ObjectCursor cursor : state.nodes().getDataNodes().values()) { + Version nodeVer = cursor.value.getVersion(); + logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer); + } + + // randomly create some indices + logger.info("--> creating some indices"); + for (int i = 0; i < randomIntBetween(2, 5); i++) { + String name = "index_" + randomAlphaOfLength(15).toLowerCase(Locale.ROOT); + Settings.Builder settingsBuilder = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) + .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 3)) + .put("index.routing.allocation.total_shards_per_node", 1); + CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metaData().hasIndex(name)); + } + state = cluster.reroute(state, new ClusterRerouteRequest()); + + ClusterState previousState = state; + // apply cluster state to nodes (incl. master) + for (DiscoveryNode node : state.nodes()) { + IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node); + ClusterState localState = adaptClusterStateToLocalNode(state, node); + ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node); + final ClusterChangedEvent event = new ClusterChangedEvent("simulating change", localState, previousLocalState); + indicesClusterStateService.applyClusterState(event); + + // check that cluster state has been properly applied to node + assertClusterStateMatchesNodeState(localState, indicesClusterStateService); + } + + logger.info("--> starting shards"); + state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; + state = cluster.reroute(state, new ClusterRerouteRequest()); + logger.info("--> starting replicas"); + state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; + state = cluster.reroute(state, new ClusterRerouteRequest()); + + logger.info("--> state before failing shards: {}", state); + for (int i = 0; i < randomIntBetween(5, 10); i++) { + for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) { + if (shardRouting.primary() && randomBoolean()) { + ShardRouting replicaToBePromoted = state.getRoutingNodes() + .activeReplicaWithHighestVersion(shardRouting.shardId()); + if (replicaToBePromoted != null) { + Version replicaNodeVersion = state.nodes().getDataNodes() + .get(replicaToBePromoted.currentNodeId()).getVersion(); + List shardsToFail = new ArrayList<>(); + logger.info("--> found replica that should be promoted: {}", replicaToBePromoted); + logger.info("--> failing shard {}", shardRouting); + shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception())); + state = cluster.applyFailedShards(state, shardsToFail); + ShardRouting newPrimary = state.routingTable().index(shardRouting.index()) + .shard(shardRouting.id()).primaryShard(); + + assertThat(newPrimary.allocationId().getId(), + equalTo(replicaToBePromoted.allocationId().getId())); + } + } + state = cluster.reroute(state, new ClusterRerouteRequest()); + } + } + } + /** * This test ensures that when a node joins a brand new cluster (different cluster UUID), * different from the cluster it was previously a part of, the in-memory index data structures @@ -388,6 +473,16 @@ protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) { Version.CURRENT); } + protected DiscoveryNode createRandomVersionNode(DiscoveryNode.Role... mustHaveRoles) { + Set roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values()))); + for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) { + roles.add(mustHaveRole); + } + final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); + return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, + VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, null)); + } + private static ClusterState adaptClusterStateToLocalNode(ClusterState state, DiscoveryNode node) { return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(node.getId())).build(); } From 27469f50430f202de55a34ca49541910cd49e2e8 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 27 Jun 2017 13:31:25 -0600 Subject: [PATCH 07/11] Re-add comment and remove extra import --- .../org/elasticsearch/cluster/routing/RoutingNodes.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 45cb9188efc42..8268b98f34dc2 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -24,7 +24,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.Assertions; -import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -327,6 +326,11 @@ public ShardRouting activePrimary(ShardId shardId) { * */ public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { + // It's possible for replicaNodeVersion to be null, when deassociating dead nodes + // that have been removed, the shards are failed, and part of the shard failing + // calls this method with an out-of-date RoutingNodes, where the version might not + // be accessible. Therefore, we need to protect against the version being null + // (meaning the node will be going away). return assignedShards(shardId).stream() .filter(shr -> !shr.primary() && shr.active()) .filter(shr -> node(shr.currentNodeId()) != null) From 4da03ea04622f64f570eeffc53959da75f2cc480 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 27 Jun 2017 13:33:28 -0600 Subject: [PATCH 08/11] Remove unneeded stuff, randomly start replicas a few more times --- ...ClusterStateServiceRandomUpdatesTests.java | 25 +++++-------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 925989daecdd8..40f012d7254d9 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -155,7 +155,6 @@ public void testRandomClusterPromotesNewestReplica() { .add(createRandomVersionNode()).build(); state = ClusterState.builder(state).nodes(newNodes).build(); state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave - updateNodes(state, clusterStateServiceMap, MockIndicesService::new); } // Log the shard versions (for debugging if necessary) @@ -170,8 +169,7 @@ public void testRandomClusterPromotesNewestReplica() { String name = "index_" + randomAlphaOfLength(15).toLowerCase(Locale.ROOT); Settings.Builder settingsBuilder = Settings.builder() .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) - .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 3)) - .put("index.routing.allocation.total_shards_per_node", 1); + .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 3)); CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE); state = cluster.createIndex(state, request); assertTrue(state.metaData().hasIndex(name)); @@ -179,24 +177,13 @@ public void testRandomClusterPromotesNewestReplica() { state = cluster.reroute(state, new ClusterRerouteRequest()); ClusterState previousState = state; - // apply cluster state to nodes (incl. master) - for (DiscoveryNode node : state.nodes()) { - IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node); - ClusterState localState = adaptClusterStateToLocalNode(state, node); - ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node); - final ClusterChangedEvent event = new ClusterChangedEvent("simulating change", localState, previousLocalState); - indicesClusterStateService.applyClusterState(event); - - // check that cluster state has been properly applied to node - assertClusterStateMatchesNodeState(localState, indicesClusterStateService); - } logger.info("--> starting shards"); - state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; - state = cluster.reroute(state, new ClusterRerouteRequest()); - logger.info("--> starting replicas"); - state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; - state = cluster.reroute(state, new ClusterRerouteRequest()); + state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); + logger.info("--> starting replicas a random number of times"); + for (int i = 0; i < randomIntBetween(1,4); i++) { + state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); + } logger.info("--> state before failing shards: {}", state); for (int i = 0; i < randomIntBetween(5, 10); i++) { From 56dfe808578decb2ec621f13af2d8c7a29c3721d Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 27 Jun 2017 14:32:14 -0600 Subject: [PATCH 09/11] Move test into FailedNodeRoutingTests --- .../allocation/FailedNodeRoutingTests.java | 127 +++++++++++++++++- ...ClusterStateServiceRandomUpdatesTests.java | 82 ----------- 2 files changed, 126 insertions(+), 83 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java index 61a28897d587e..7f69ec4204d5f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java @@ -19,20 +19,47 @@ package org.elasticsearch.cluster.routing.allocation; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; - +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.indices.cluster.AbstractIndicesClusterStateServiceTestCase; +import org.elasticsearch.indices.cluster.ClusterStateChanges; +import org.elasticsearch.indices.cluster.IndicesClusterStateService; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.hamcrest.Matchers.equalTo; @@ -91,4 +118,102 @@ public void testSimpleFailedNodeTest() { assertThat(routingNode.numberOfShardsWithState(INITIALIZING), equalTo(1)); } } + + public void testRandomClusterPromotesNewestReplica() throws InterruptedException { + + ThreadPool threadPool = new TestThreadPool(getClass().getName()); + ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + ClusterState state = randomInitialClusterState(); + + // randomly add nodes of mixed versions + logger.info("--> adding random nodes"); + for (int i = 0; i < randomIntBetween(4, 8); i++) { + DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()) + .add(createNode()).build(); + state = ClusterState.builder(state).nodes(newNodes).build(); + state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave + } + + // Log the shard versions (for debugging if necessary) + for (ObjectCursor cursor : state.nodes().getDataNodes().values()) { + Version nodeVer = cursor.value.getVersion(); + logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer); + } + + // randomly create some indices + logger.info("--> creating some indices"); + for (int i = 0; i < randomIntBetween(2, 5); i++) { + String name = "index_" + randomAlphaOfLength(15).toLowerCase(Locale.ROOT); + Settings.Builder settingsBuilder = Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) + .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 3)); + CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metaData().hasIndex(name)); + } + state = cluster.reroute(state, new ClusterRerouteRequest()); + + ClusterState previousState = state; + + logger.info("--> starting shards"); + state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); + logger.info("--> starting replicas a random number of times"); + for (int i = 0; i < randomIntBetween(1,4); i++) { + state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); + } + + logger.info("--> state before failing shards: {}", state); + for (int i = 0; i < randomIntBetween(5, 10); i++) { + for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) { + if (shardRouting.primary() && randomBoolean()) { + ShardRouting replicaToBePromoted = state.getRoutingNodes() + .activeReplicaWithHighestVersion(shardRouting.shardId()); + if (replicaToBePromoted != null) { + Version replicaNodeVersion = state.nodes().getDataNodes() + .get(replicaToBePromoted.currentNodeId()).getVersion(); + List shardsToFail = new ArrayList<>(); + logger.info("--> found replica that should be promoted: {}", replicaToBePromoted); + logger.info("--> failing shard {}", shardRouting); + shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception())); + state = cluster.applyFailedShards(state, shardsToFail); + ShardRouting newPrimary = state.routingTable().index(shardRouting.index()) + .shard(shardRouting.id()).primaryShard(); + + assertThat(newPrimary.allocationId().getId(), + equalTo(replicaToBePromoted.allocationId().getId())); + } + } + state = cluster.reroute(state, new ClusterRerouteRequest()); + } + } + terminate(threadPool); + } + + private static final AtomicInteger nodeIdGenerator = new AtomicInteger(); + + public ClusterState randomInitialClusterState() { + List allNodes = new ArrayList<>(); + DiscoveryNode localNode = createNode(DiscoveryNode.Role.MASTER); // local node is the master + allNodes.add(localNode); + // at least two nodes that have the data role so that we can allocate shards + allNodes.add(createNode(DiscoveryNode.Role.DATA)); + allNodes.add(createNode(DiscoveryNode.Role.DATA)); + for (int i = 0; i < randomIntBetween(2, 5); i++) { + allNodes.add(createNode()); + } + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[allNodes.size()])); + return state; + } + + + protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) { + Set roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values()))); + for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) { + roles.add(mustHaveRole); + } + final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); + return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, + VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, null)); + } + } diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 40f012d7254d9..adfc6609d8f93 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices.cluster; -import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; @@ -51,7 +50,6 @@ import org.elasticsearch.index.Index; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.repositories.RepositoriesService; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -74,8 +72,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; -import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -143,74 +139,6 @@ public void testRandomClusterStateUpdates() { logger.info("Final cluster state: {}", state); } - public void testRandomClusterPromotesNewestReplica() { - // we have an IndicesClusterStateService per node in the cluster - final Map clusterStateServiceMap = new HashMap<>(); - ClusterState state = randomInitialClusterState(clusterStateServiceMap, MockIndicesService::new); - - // randomly add nodes of mixed versions - logger.info("--> adding random nodes"); - for (int i = 0; i < randomIntBetween(4, 8); i++) { - DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()) - .add(createRandomVersionNode()).build(); - state = ClusterState.builder(state).nodes(newNodes).build(); - state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave - } - - // Log the shard versions (for debugging if necessary) - for (ObjectCursor cursor : state.nodes().getDataNodes().values()) { - Version nodeVer = cursor.value.getVersion(); - logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer); - } - - // randomly create some indices - logger.info("--> creating some indices"); - for (int i = 0; i < randomIntBetween(2, 5); i++) { - String name = "index_" + randomAlphaOfLength(15).toLowerCase(Locale.ROOT); - Settings.Builder settingsBuilder = Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) - .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 3)); - CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE); - state = cluster.createIndex(state, request); - assertTrue(state.metaData().hasIndex(name)); - } - state = cluster.reroute(state, new ClusterRerouteRequest()); - - ClusterState previousState = state; - - logger.info("--> starting shards"); - state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); - logger.info("--> starting replicas a random number of times"); - for (int i = 0; i < randomIntBetween(1,4); i++) { - state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); - } - - logger.info("--> state before failing shards: {}", state); - for (int i = 0; i < randomIntBetween(5, 10); i++) { - for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) { - if (shardRouting.primary() && randomBoolean()) { - ShardRouting replicaToBePromoted = state.getRoutingNodes() - .activeReplicaWithHighestVersion(shardRouting.shardId()); - if (replicaToBePromoted != null) { - Version replicaNodeVersion = state.nodes().getDataNodes() - .get(replicaToBePromoted.currentNodeId()).getVersion(); - List shardsToFail = new ArrayList<>(); - logger.info("--> found replica that should be promoted: {}", replicaToBePromoted); - logger.info("--> failing shard {}", shardRouting); - shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception())); - state = cluster.applyFailedShards(state, shardsToFail); - ShardRouting newPrimary = state.routingTable().index(shardRouting.index()) - .shard(shardRouting.id()).primaryShard(); - - assertThat(newPrimary.allocationId().getId(), - equalTo(replicaToBePromoted.allocationId().getId())); - } - } - state = cluster.reroute(state, new ClusterRerouteRequest()); - } - } - } - /** * This test ensures that when a node joins a brand new cluster (different cluster UUID), * different from the cluster it was previously a part of, the in-memory index data structures @@ -460,16 +388,6 @@ protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) { Version.CURRENT); } - protected DiscoveryNode createRandomVersionNode(DiscoveryNode.Role... mustHaveRoles) { - Set roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values()))); - for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) { - roles.add(mustHaveRole); - } - final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); - return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, - VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, null)); - } - private static ClusterState adaptClusterStateToLocalNode(ClusterState state, DiscoveryNode node) { return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(node.getId())).build(); } From 3ca757b679fa47d68a595dafb037a727e57185c9 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 27 Jun 2017 16:04:05 -0600 Subject: [PATCH 10/11] Make assertions actually test replica version promotion --- .../allocation/FailedNodeRoutingTests.java | 77 +++++++++++++------ 1 file changed, 54 insertions(+), 23 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java index 7f69ec4204d5f..9d12e9fcefff4 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; @@ -57,6 +58,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; @@ -143,10 +145,10 @@ public void testRandomClusterPromotesNewestReplica() throws InterruptedException // randomly create some indices logger.info("--> creating some indices"); for (int i = 0; i < randomIntBetween(2, 5); i++) { - String name = "index_" + randomAlphaOfLength(15).toLowerCase(Locale.ROOT); + String name = "index_" + randomAlphaOfLength(8).toLowerCase(Locale.ROOT); Settings.Builder settingsBuilder = Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) - .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 3)); + .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 4)) + .put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(2, 4)); CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE); state = cluster.createIndex(state, request); assertTrue(state.metaData().hasIndex(name)); @@ -158,37 +160,66 @@ public void testRandomClusterPromotesNewestReplica() throws InterruptedException logger.info("--> starting shards"); state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); logger.info("--> starting replicas a random number of times"); - for (int i = 0; i < randomIntBetween(1,4); i++) { + for (int i = 0; i < randomIntBetween(1,10); i++) { state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); } logger.info("--> state before failing shards: {}", state); - for (int i = 0; i < randomIntBetween(5, 10); i++) { - for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) { - if (shardRouting.primary() && randomBoolean()) { - ShardRouting replicaToBePromoted = state.getRoutingNodes() - .activeReplicaWithHighestVersion(shardRouting.shardId()); - if (replicaToBePromoted != null) { - Version replicaNodeVersion = state.nodes().getDataNodes() - .get(replicaToBePromoted.currentNodeId()).getVersion(); - List shardsToFail = new ArrayList<>(); - logger.info("--> found replica that should be promoted: {}", replicaToBePromoted); - logger.info("--> failing shard {}", shardRouting); - shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception())); - state = cluster.applyFailedShards(state, shardsToFail); - ShardRouting newPrimary = state.routingTable().index(shardRouting.index()) - .shard(shardRouting.id()).primaryShard(); - - assertThat(newPrimary.allocationId().getId(), - equalTo(replicaToBePromoted.allocationId().getId())); + for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) { + if (shardRouting.primary() && randomBoolean()) { + ShardRouting replicaToBePromoted = state.getRoutingNodes() + .activeReplicaWithHighestVersion(shardRouting.shardId()); + final ClusterState currentState = state; + // List of potential candidate replicas for promotion + Set candidates = state.getRoutingNodes().shardsWithState(STARTED) + .stream() + .filter(s -> !s.primary() && s.active()) + .filter(s -> s.shardId().equals(shardRouting.shardId())) + .filter(s -> !s.equals(replicaToBePromoted)) + .filter(s -> currentState.getRoutingNodes().node(s.currentNodeId()) != null) + .collect(Collectors.toSet()); + // If we find a replica and at least another candidate + if (replicaToBePromoted != null && candidates.size() > 0) { + logger.info("--> found replica that should be promoted: {}", replicaToBePromoted); + logger.info("--> other candidates: {}", candidates); + + List shardsToFail = new ArrayList<>(); + logger.info("--> failing shard {}", shardRouting); + shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception())); + state = cluster.applyFailedShards(state, shardsToFail); + ShardRouting newPrimary = state.routingTable().index(shardRouting.index()) + .shard(shardRouting.id()).primaryShard(); + Version newPrimaryVersion = getNodeVersion(newPrimary, state); + + final ClusterState compareState = state; + logger.info("--> new primary is on version {}: {}", newPrimaryVersion, newPrimary); + List candidateVersions = candidates.stream().map(sr -> { + Version version = getNodeVersion(sr, compareState); + logger.info("--> candidate on {} node; shard routing: {}", version, sr); + return version; + }).collect(Collectors.toList()); + for (Version candidateVer : candidateVersions) { + assertTrue("candidate was not on the newest version, new primary is on " + + newPrimaryVersion + " and there is a candidate on " + candidateVer, + candidateVer.onOrBefore(newPrimaryVersion)); } } - state = cluster.reroute(state, new ClusterRerouteRequest()); } + state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); } terminate(threadPool); } + private static Version getNodeVersion(ShardRouting shardRouting, ClusterState state) { + for (ObjectObjectCursor entry : state.getNodes().getDataNodes()) { + if (entry.key.equals(shardRouting.currentNodeId())) { + return state.getRoutingNodes().node(entry.key).node().getVersion(); + } + } + fail("shard is not assigned to a node"); + return null; + } + private static final AtomicInteger nodeIdGenerator = new AtomicInteger(); public ClusterState randomInitialClusterState() { From 9d9d4b7e12d75f39886133f216c6766e61cf7854 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 28 Jun 2017 11:39:07 -0600 Subject: [PATCH 11/11] Rewrite test, taking Yannick's feedback into account --- .../allocation/FailedNodeRoutingTests.java | 92 +++++++++---------- 1 file changed, 41 insertions(+), 51 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java index 9d12e9fcefff4..3b551e912947a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedNodeRoutingTests.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -133,10 +134,10 @@ public void testRandomClusterPromotesNewestReplica() throws InterruptedException DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()) .add(createNode()).build(); state = ClusterState.builder(state).nodes(newNodes).build(); - state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave + state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after adding node } - // Log the shard versions (for debugging if necessary) + // Log the node versions (for debugging if necessary) for (ObjectCursor cursor : state.nodes().getDataNodes().values()) { Version nodeVer = cursor.value.getVersion(); logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer); @@ -153,7 +154,6 @@ public void testRandomClusterPromotesNewestReplica() throws InterruptedException state = cluster.createIndex(state, request); assertTrue(state.metaData().hasIndex(name)); } - state = cluster.reroute(state, new ClusterRerouteRequest()); ClusterState previousState = state; @@ -164,60 +164,50 @@ public void testRandomClusterPromotesNewestReplica() throws InterruptedException state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); } - logger.info("--> state before failing shards: {}", state); - for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) { - if (shardRouting.primary() && randomBoolean()) { - ShardRouting replicaToBePromoted = state.getRoutingNodes() - .activeReplicaWithHighestVersion(shardRouting.shardId()); - final ClusterState currentState = state; - // List of potential candidate replicas for promotion - Set candidates = state.getRoutingNodes().shardsWithState(STARTED) + boolean keepGoing = true; + while (keepGoing) { + List primaries = state.getRoutingNodes().shardsWithState(STARTED) + .stream().filter(ShardRouting::primary).collect(Collectors.toList()); + + // Pick a random subset of primaries to fail + List shardsToFail = new ArrayList<>(); + List failedPrimaries = randomSubsetOf(primaries); + failedPrimaries.stream().forEach(sr -> { + shardsToFail.add(new FailedShard(randomFrom(sr), "failed primary", new Exception())); + }); + + logger.info("--> state before failing shards: {}", state); + state = cluster.applyFailedShards(state, shardsToFail); + + final ClusterState compareState = state; + failedPrimaries.forEach(shardRouting -> { + logger.info("--> verifying version for {}", shardRouting); + + ShardRouting newPrimary = compareState.routingTable().index(shardRouting.index()) + .shard(shardRouting.id()).primaryShard(); + Version newPrimaryVersion = getNodeVersion(newPrimary, compareState); + + logger.info("--> new primary is on version {}: {}", newPrimaryVersion, newPrimary); + compareState.routingTable().shardRoutingTable(newPrimary.shardId()).shardsWithState(STARTED) .stream() - .filter(s -> !s.primary() && s.active()) - .filter(s -> s.shardId().equals(shardRouting.shardId())) - .filter(s -> !s.equals(replicaToBePromoted)) - .filter(s -> currentState.getRoutingNodes().node(s.currentNodeId()) != null) - .collect(Collectors.toSet()); - // If we find a replica and at least another candidate - if (replicaToBePromoted != null && candidates.size() > 0) { - logger.info("--> found replica that should be promoted: {}", replicaToBePromoted); - logger.info("--> other candidates: {}", candidates); - - List shardsToFail = new ArrayList<>(); - logger.info("--> failing shard {}", shardRouting); - shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception())); - state = cluster.applyFailedShards(state, shardsToFail); - ShardRouting newPrimary = state.routingTable().index(shardRouting.index()) - .shard(shardRouting.id()).primaryShard(); - Version newPrimaryVersion = getNodeVersion(newPrimary, state); - - final ClusterState compareState = state; - logger.info("--> new primary is on version {}: {}", newPrimaryVersion, newPrimary); - List candidateVersions = candidates.stream().map(sr -> { - Version version = getNodeVersion(sr, compareState); - logger.info("--> candidate on {} node; shard routing: {}", version, sr); - return version; - }).collect(Collectors.toList()); - for (Version candidateVer : candidateVersions) { - assertTrue("candidate was not on the newest version, new primary is on " + - newPrimaryVersion + " and there is a candidate on " + candidateVer, - candidateVer.onOrBefore(newPrimaryVersion)); - } - } - } - state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING)); + .forEach(sr -> { + Version candidateVer = getNodeVersion(sr, compareState); + if (candidateVer != null) { + logger.info("--> candidate on {} node; shard routing: {}", candidateVer, sr); + assertTrue("candidate was not on the newest version, new primary is on " + + newPrimaryVersion + " and there is a candidate on " + candidateVer, + candidateVer.onOrBefore(newPrimaryVersion)); + } + }); + }); + + keepGoing = randomBoolean(); } terminate(threadPool); } private static Version getNodeVersion(ShardRouting shardRouting, ClusterState state) { - for (ObjectObjectCursor entry : state.getNodes().getDataNodes()) { - if (entry.key.equals(shardRouting.currentNodeId())) { - return state.getRoutingNodes().node(entry.key).node().getVersion(); - } - } - fail("shard is not assigned to a node"); - return null; + return Optional.ofNullable(state.getNodes().get(shardRouting.currentNodeId())).map(DiscoveryNode::getVersion).orElse(null); } private static final AtomicInteger nodeIdGenerator = new AtomicInteger();