From 24f29eec3c7a220feeb44ba0aa5d35a9ce0ce9dd Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Fri, 17 Sep 2021 23:04:48 +0530 Subject: [PATCH] Review comments Signed-off-by: Bukhtawar Khan --- .../NodeLoadAwareAllocationDecider.java | 22 +- .../NodeLoadAwareAllocationTests.java | 500 ++++++------------ 2 files changed, 168 insertions(+), 354 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java index 1250174ba66ae..4262a27d99d6e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/NodeLoadAwareAllocationDecider.java @@ -25,7 +25,9 @@ /** * This {@link NodeLoadAwareAllocationDecider} controls shard over-allocation * due to node failures or otherwise on the surviving nodes. The allocation limits - * are decided by the user provisioned capacity, to determine if there were lost nodes + * are decided by the user provisioned capacity, to determine if there were lost nodes. + * The provisioned capacity as defined by the below settings needs to updated one every + * cluster scale up and scale down operations. *
  * cluster.routing.allocation.overload_awareness.provisioned_capacity: N
  * 
@@ -38,7 +40,10 @@ * The total limit per node based on skew_factor doesn't limit primaries that previously * existed on the disk as those shards are force allocated by * {@link AllocationDeciders#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)} - * however new primaries due to index creation, snapshot restore etc can be controlled via the below settings + * however new primaries due to index creation, snapshot restore etc can be controlled via the below settings. + * Setting the value to true allows newly created primaries to get assigned while preventing the replica allocation + * breaching the skew factor. + * Note that setting this to false can result in the primaries not get assigned and the cluster turning RED *
  * cluster.routing.allocation.load_awareness.allow_unassigned_primaries
  * 
@@ -50,8 +55,8 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider { public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING = Setting.intSetting("cluster.routing.allocation.load_awareness.provisioned_capacity", -1, -1, Property.Dynamic, Property.NodeScope); - public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING = - Setting.intSetting("cluster.routing.allocation.load_awareness.skew_factor", 50, -1, Property.Dynamic, + public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING = + Setting.doubleSetting("cluster.routing.allocation.load_awareness.skew_factor", 50, -1, Property.Dynamic, Property.NodeScope); public static final Setting CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING = Setting.boolSetting("cluster.routing.allocation.load_awareness.allow_unassigned_primaries", @@ -59,7 +64,7 @@ public class NodeLoadAwareAllocationDecider extends AllocationDecider { private volatile int provisionedCapacity; - private volatile int skewFactor; + private volatile double skewFactor; private volatile boolean allowUnassignedPrimaries; @@ -81,7 +86,7 @@ private void setAllowUnassignedPrimaries(boolean allowUnassignedPrimaries) { this.allowUnassignedPrimaries = allowUnassignedPrimaries; } - private void setSkewFactor(int skewFactor) { + private void setSkewFactor(double skewFactor) { this.skewFactor = skewFactor; } @@ -120,9 +125,8 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout logger.debug(() -> new ParameterizedMessage("Too many shards [{}] allocated to this node [{}]. Expected average shards" + " per node [{}], overload factor [{}], node limit [{}]", nodeShardCount, node.nodeId(), expectedAvgShardsPerNode, skewFactor, limit)); - return allocation.decision(Decision.NO, NAME, - "too many shards [%d] allocated to this node, limit per node [%d] for overload factor [%s] based on capacity [%d]", - nodeShardCount, limit, skewFactor, provisionedCapacity); + return allocation.decision(Decision.NO, NAME, "too many shards [%d] allocated to this node, limit per node [%d] considering" + + " overload factor [%.2f] based on capacity [%d]", nodeShardCount, limit, skewFactor, provisionedCapacity); } return allocation.decision(Decision.YES, NAME, "node meets all skew awareness attribute requirements"); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java index 872f2b266111c..4ecfe3fd1cbda 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeLoadAwareAllocationTests.java @@ -29,6 +29,8 @@ import org.opensearch.gateway.GatewayAllocator; import org.opensearch.test.gateway.TestGatewayAllocator; +import java.util.Map; + import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.sameInstance; @@ -41,19 +43,13 @@ public class NodeLoadAwareAllocationTests extends OpenSearchAllocationTestCase { private final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationTests.class); public void testNewUnassignedPrimaryAllocationOnOverload() { - AllocationService strategy = createAllocationService(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(),5) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), - true) - .build()); + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), true) + ); logger.info("Building initial routing table for 'testNewUnassignedPrimaryAllocationOnOverload'"); - Metadata metadata = Metadata.builder() .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(20).numberOfReplicas(0)) .build(); @@ -65,27 +61,17 @@ public void testNewUnassignedPrimaryAllocationOnOverload() { ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); - logger.info("--> adding three nodes on same zone and do rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", singletonMap("zone", "zone_1"))) - .add(newNode("node2", singletonMap("zone", "zone_1"))) - .add(newNode("node3", singletonMap("zone", "zone_1"))) - .add(newNode("node4", singletonMap("zone", "zone_1"))) - .add(newNode("node5", singletonMap("zone", "zone_1"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); logger.info("--> start the shards (primaries)"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); - logger.info("--> Remove node from zone holding primaries"); - ClusterState newState = removeNode(clusterState, "node1", strategy); - logger.info("--> Remove node from zone holding primaries"); - newState = removeNode(newState, "node2", strategy); - logger.info("--> Remove node from zone holding primaries"); - newState = removeNode(newState, "node3", strategy); + logger.info("--> Remove nodes from zone holding primaries"); + ClusterState newState = removeNodes(clusterState, strategy, "node1", "node2", "node3"); logger.info("add another index with 20 shards"); metadata = Metadata.builder(newState.metadata()) @@ -142,16 +128,11 @@ public void testNewUnassignedPrimaryAllocationOnOverload() { } public void testNoAllocationLimitsOnOverloadForDisabledLoadFactor() { - AllocationService strategy = createAllocationService(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(),5) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), -1) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), - false) - .build()); + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), -1, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), false) + ); logger.info("Building initial routing table for 'testNoAllocationLimitsOnOverloadForDisabledLoadFactor'"); @@ -166,27 +147,17 @@ public void testNoAllocationLimitsOnOverloadForDisabledLoadFactor() { ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); - logger.info("--> adding three nodes on same zone and do rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", singletonMap("zone", "zone_1"))) - .add(newNode("node2", singletonMap("zone", "zone_1"))) - .add(newNode("node3", singletonMap("zone", "zone_1"))) - .add(newNode("node4", singletonMap("zone", "zone_1"))) - .add(newNode("node5", singletonMap("zone", "zone_1"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); logger.info("--> start the shards (primaries)"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); - logger.info("--> Remove node from zone holding primaries"); - ClusterState newState = removeNode(clusterState, "node1", strategy); - logger.info("--> Remove node from zone holding primaries"); - newState = removeNode(newState, "node2", strategy); - logger.info("--> Remove node from zone holding primaries"); - newState = removeNode(newState, "node3", strategy); + logger.info("--> Remove nodes from zone holding primaries"); + ClusterState newState = removeNodes(clusterState, strategy, "node1", "node2", "node3" ); logger.info("add another index with 20 shards"); metadata = Metadata.builder(newState.metadata()) @@ -246,16 +217,11 @@ public void testNoAllocationLimitsOnOverloadForDisabledLoadFactor() { public void testExistingPrimariesAllocationOnOverload() { GatewayAllocator gatewayAllocator = new TestGatewayAllocator(); - AllocationService strategy = createAllocationService(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 50) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), - false) - .build(), gatewayAllocator); + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 50, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), false), + gatewayAllocator); logger.info("Building initial routing table for 'testExistingPrimariesAllocationOnOverload'"); @@ -270,27 +236,16 @@ public void testExistingPrimariesAllocationOnOverload() { ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); - logger.info("--> adding three nodes on same zone and do rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", singletonMap("zone", "zone_1"))) - .add(newNode("node2", singletonMap("zone", "zone_1"))) - .add(newNode("node3", singletonMap("zone", "zone_1"))) - .add(newNode("node4", singletonMap("zone", "zone_1"))) - .add(newNode("node5", singletonMap("zone", "zone_1"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + logger.info("--> adding five nodes on same zone and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); logger.info("--> start the shards (primaries)"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(20)); - logger.info("--> Remove node1 from zone holding primaries"); - ClusterState newState = removeNode(clusterState, "node1", strategy); - logger.info("--> Remove node2 from zone holding primaries"); - newState = removeNode(newState, "node2", strategy); - logger.info("--> Remove node3 from zone holding primaries"); - newState = removeNode(newState, "node3", strategy); + logger.info("--> Remove nodes from zone holding primaries"); + ClusterState newState = removeNodes(clusterState, strategy, "node1", "node2", "node3"); assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(8)); @@ -323,20 +278,14 @@ public void testExistingPrimariesAllocationOnOverload() { assertThat(newState.getRoutingNodes().node("node4").size(), equalTo(12)); logger.info("--> Remove node4 from zone holding primaries"); - newState = removeNode(newState, "node4", strategy); + newState = removeNodes(newState, strategy,"node4"); logger.info("--> change the overload load factor to zero and verify if unassigned primaries on disk get assigned despite overload"); - strategy = createAllocationService(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5) - .put("cluster.routing.allocation.awareness.attributes", "zone") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 0) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), - false) - .build(), gatewayAllocator); + strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 0, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), false), + gatewayAllocator); newState = strategy.reroute(newState, "reroute"); @@ -392,16 +341,11 @@ public void testExistingPrimariesAllocationOnOverload() { public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { GatewayAllocator gatewayAllocator = new TestGatewayAllocator(); - AllocationService strategy = createAllocationService(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(),5) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), - true) - .build(), gatewayAllocator); + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), true), + gatewayAllocator); logger.info("Building initial routing table for 'testSingleZoneOneReplicaLimitsShardAllocationOnOverload'"); @@ -417,14 +361,7 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); logger.info("--> adding five nodes on same zone and do rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", singletonMap("zone", "zone_1"))) - .add(newNode("node2", singletonMap("zone", "zone_1"))) - .add(newNode("node3", singletonMap("zone", "zone_1"))) - .add(newNode("node4", singletonMap("zone", "zone_1"))) - .add(newNode("node5", singletonMap("zone", "zone_1"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3", "node4", "node5"); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); logger.info("--> start the shards (primaries)"); @@ -436,8 +373,7 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(40)); logger.info("--> Remove node1 from zone"); - ClusterState newState = removeNode(clusterState, "node1", strategy); - newState = strategy.reroute(newState, "reroute"); + ClusterState newState = removeNodes(clusterState, strategy, "node1"); while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { newState = startInitializingShardsAndReroute(strategy, newState); @@ -450,7 +386,7 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { } logger.info("--> Remove node2 when the limit of overload is reached"); - newState = removeNode(newState, "node2", strategy); + newState = removeNodes(newState, strategy, "node2"); newState = strategy.reroute(newState, "reroute"); while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { @@ -505,16 +441,11 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(20)); logger.info("change settings to allow unassigned primaries"); - strategy = createAllocationService(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), - false) - .build(), gatewayAllocator); + strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), false), + gatewayAllocator); for (RoutingNode node : newState.getRoutingNodes()) { assertThat(node.size(), equalTo(40)); @@ -542,17 +473,12 @@ public void testSingleZoneOneReplicaLimitsShardAllocationOnOverload() { assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).stream().filter(x -> x.primary()).count(), equalTo(5L)); } - public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload(){ - AllocationService strategy = createAllocationService(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(),15) - .put("cluster.routing.allocation.awareness.attributes", "zone") - .put("cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2,zone_3") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20) - .build()); + public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 15, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20, + "cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2,zone_3") + ); logger.info("Building initial routing table for 'testThreeZoneTwoReplicaLimitsShardAllocationOnOverload'"); @@ -568,14 +494,7 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload(){ .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); logger.info("--> adding five nodes on same zone and do rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", singletonMap("zone", "zone1"))) - .add(newNode("node2", singletonMap("zone", "zone1"))) - .add(newNode("node3", singletonMap("zone", "zone1"))) - .add(newNode("node4", singletonMap("zone", "zone1"))) - .add(newNode("node5", singletonMap("zone", "zone1"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5"); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); logger.info("--> start the shards (primaries)"); @@ -588,14 +507,7 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload(){ assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(40)); logger.info("--> add five new node in new zone and reroute"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node6", singletonMap("zone", "zone2"))) - .add(newNode("node7", singletonMap("zone", "zone2"))) - .add(newNode("node8", singletonMap("zone", "zone2"))) - .add(newNode("node9", singletonMap("zone", "zone2"))) - .add(newNode("node10", singletonMap("zone", "zone2"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(20)); assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(20)); @@ -610,14 +522,8 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload(){ logger.info("--> add another five node in new zone and reroute"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node11", singletonMap("zone", "zone3"))) - .add(newNode("node12", singletonMap("zone", "zone3"))) - .add(newNode("node13", singletonMap("zone", "zone3"))) - .add(newNode("node14", singletonMap("zone", "zone3"))) - .add(newNode("node15", singletonMap("zone", "zone3"))) - ).build(); - ClusterState newState = strategy.reroute(clusterState, "reroute"); + ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { newState = startInitializingShardsAndReroute(strategy, newState); } @@ -630,11 +536,8 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload(){ assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4)); logger.info("--> Remove three node from zone3 holding primary and replicas"); - //remove one nodes in one zone to cause distribution zone1->5 , zone2->5, zone3->4 - newState = removeNode(newState, "node11", strategy); - newState = removeNode(newState, "node12", strategy); - newState = removeNode(newState, "node13", strategy); - newState = strategy.reroute(newState, "reroute"); + + newState = removeNodes(newState, strategy, "node11", "node12", "node13"); while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { newState = startInitializingShardsAndReroute(strategy, newState); @@ -644,30 +547,21 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload(){ assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5)); //add the removed node - newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) - .add(newNode("node11", singletonMap("zone", "zone3")))) - .build(); - newState = strategy.reroute(newState, "reroute"); + newState = addNodes(newState, strategy, "zone3", "node11"); assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); newState = startInitializingShardsAndReroute(strategy, newState); assertThat(newState.getRoutingNodes().node("node11").size(), equalTo(5)); //add the removed node - newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) - .add(newNode("node12", singletonMap("zone", "zone3")))) - .build(); - newState = strategy.reroute(newState, "reroute"); + newState = addNodes(newState, strategy, "zone3", "node12"); assertThat(newState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(5)); newState = startInitializingShardsAndReroute(strategy, newState); assertThat(newState.getRoutingNodes().node("node12").size(), equalTo(5)); //add the removed node - newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) - .add(newNode("node13", singletonMap("zone", "zone3")))) - .build(); - newState = strategy.reroute(newState, "reroute"); + newState = addNodes(newState, strategy, "zone3", "node13"); while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { newState = startInitializingShardsAndReroute(strategy, newState); @@ -680,19 +574,13 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverload(){ assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); } - public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload(){ - AllocationService strategy = createAllocationService(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(),15) - .put("cluster.routing.allocation.awareness.attributes", "zone") - .put("cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2,zone_3") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), - true) - .build()); + public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload() { + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 15, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), true, + "cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2,zone_3") + ); logger.info("Building initial routing table for 'testThreeZoneOneReplicaLimitsShardAllocationOnOverload'"); @@ -708,14 +596,7 @@ public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload(){ .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); logger.info("--> adding five nodes on same zone and do rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", singletonMap("zone", "zone1"))) - .add(newNode("node2", singletonMap("zone", "zone1"))) - .add(newNode("node3", singletonMap("zone", "zone1"))) - .add(newNode("node4", singletonMap("zone", "zone1"))) - .add(newNode("node5", singletonMap("zone", "zone1"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5"); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(30)); logger.info("--> start the shards (primaries)"); @@ -728,14 +609,7 @@ public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload(){ assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(30)); logger.info("--> add five new node in new zone and reroute"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node6", singletonMap("zone", "zone2"))) - .add(newNode("node7", singletonMap("zone", "zone2"))) - .add(newNode("node8", singletonMap("zone", "zone2"))) - .add(newNode("node9", singletonMap("zone", "zone2"))) - .add(newNode("node10", singletonMap("zone", "zone2"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(30)); assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(25)); @@ -750,14 +624,7 @@ public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload(){ logger.info("--> add another five node in new zone and reroute"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node11", singletonMap("zone", "zone3"))) - .add(newNode("node12", singletonMap("zone", "zone3"))) - .add(newNode("node13", singletonMap("zone", "zone3"))) - .add(newNode("node14", singletonMap("zone", "zone3"))) - .add(newNode("node15", singletonMap("zone", "zone3"))) - ).build(); - ClusterState newState = strategy.reroute(clusterState, "reroute"); + ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15"); while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { newState = startInitializingShardsAndReroute(strategy, newState); } @@ -770,11 +637,8 @@ public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload(){ assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4)); logger.info("--> Remove three node from zone3"); - //remove one nodes in one zone to cause distribution zone1->5 , zone2->5, zone3->4 - newState = removeNode(newState, "node11", strategy); - newState = removeNode(newState, "node12", strategy); - newState = removeNode(newState, "node13", strategy); - newState = strategy.reroute(newState, "reroute"); + + newState = removeNodes(newState, strategy, "node11", "node12", "node13"); while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { newState = startInitializingShardsAndReroute(strategy, newState); @@ -783,23 +647,8 @@ public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload(){ assertThat(newState.getRoutingNodes().node("node14").size(), equalTo(5)); assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(5)); - //add the removed node - newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) - .add(newNode("node11", singletonMap("zone", "zone3")))) - .build(); - newState = strategy.reroute(newState, "reroute"); - - //add the removed node - newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) - .add(newNode("node12", singletonMap("zone", "zone3")))) - .build(); - newState = strategy.reroute(newState, "reroute"); - - //add the removed node - newState = ClusterState.builder(newState).nodes(DiscoveryNodes.builder(newState.nodes()) - .add(newNode("node13", singletonMap("zone", "zone3")))) - .build(); - newState = strategy.reroute(newState, "reroute"); + //add the removed nodes + newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13"); while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { newState = startInitializingShardsAndReroute(strategy, newState); @@ -813,16 +662,11 @@ public void testThreeZoneOneReplicaLimitsShardAllocationOnOverload(){ } public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones() { - AllocationService strategy = createAllocationService(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 21) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 21) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 21) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(),9) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 10) - .put("cluster.routing.allocation.awareness.attributes", "zone") - .put("cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2,zone_3") - .build()); + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 9, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 10, + "cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2,zone_3") + ); logger.info("Building initial routing table for 'testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones'"); @@ -838,12 +682,7 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones() .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); logger.info("--> adding three nodes on same zone and do rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", singletonMap("zone", "zone_1"))) - .add(newNode("node2", singletonMap("zone", "zone_1"))) - .add(newNode("node3", singletonMap("zone", "zone_1"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = addNodes(clusterState, strategy, "zone_1", "node1", "node2", "node3"); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(21)); logger.info("--> start the shards (primaries)"); @@ -854,12 +693,7 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones() assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); logger.info("--> add three new node with a new rack and reroute"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node4", singletonMap("zone", "zone_2"))) - .add(newNode("node5", singletonMap("zone", "zone_2"))) - .add(newNode("node6", singletonMap("zone", "zone_2"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = addNodes(clusterState, strategy, "zone_2", "node4", "node5", "node6"); assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(21)); assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(21)); @@ -876,23 +710,16 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones() logger.info("--> add another node with a new rack, make sure nothing moves"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node7", singletonMap("zone", "zone_3"))) - .add(newNode("node8", singletonMap("zone", "zone_3"))) - .add(newNode("node9", singletonMap("zone", "zone_3"))) - ).build(); - ClusterState newState = strategy.reroute(clusterState, "reroute"); + ClusterState newState = addNodes(clusterState, strategy, "zone_3", "node7", "node8", "node9"); while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { newState = startInitializingShardsAndReroute(strategy, newState); } assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(63)); - logger.info("--> Remove one node from zone1"); + logger.info("--> Remove two nodes from zones"); //remove one nodes in one zone to cause distribution zone1->2 , zone2->3, zone3->2 - newState = removeNode(newState, "node7", strategy); - logger.info("--> Remove another node from zones2"); - newState = removeNode(newState, "node2", strategy); - newState = strategy.reroute(newState, "reroute"); + newState = removeNodes(newState, strategy, "node7", "node2"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { newState = startInitializingShardsAndReroute(strategy, newState); } @@ -921,14 +748,10 @@ public void testThreeZoneTwoReplicaLimitsShardAllocationOnOverloadAcrossZones() } public void testSingleZoneTwoReplicaLimitsReplicaAllocationOnOverload() { - AllocationService strategy = createAllocationService(Settings.builder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 10) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(),3) - .put("cluster.routing.allocation.awareness.attributes", "zone") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 10) - .build()); + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 3, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 10) + ); logger.info("Building initial routing table for 'testSingleZoneTwoReplicaLimitsReplicaAllocationOnOverload'"); @@ -943,13 +766,9 @@ public void testSingleZoneTwoReplicaLimitsReplicaAllocationOnOverload() { ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); - logger.info("--> adding two nodes on same rack and do rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", singletonMap("zone", "zone1"))) - .add(newNode("node2", singletonMap("zone", "zone1"))) - .add(newNode("node3", singletonMap("zone", "zone1"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + logger.info("--> adding three nodes on same rack and do rerouting"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(3)); logger.info("--> start the shards (primaries)"); @@ -969,8 +788,7 @@ public void testSingleZoneTwoReplicaLimitsReplicaAllocationOnOverload() { assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); //remove one node to make zone1 skewed - clusterState = removeNode(clusterState, randomFrom("node1", "node2", "node3"), strategy); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = removeNodes(clusterState, strategy, randomFrom("node1", "node2", "node3")); while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { clusterState = startInitializingShardsAndReroute(strategy, clusterState); @@ -987,17 +805,11 @@ public void testSingleZoneTwoReplicaLimitsReplicaAllocationOnOverload() { } public void testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload() { - AllocationService strategy = createAllocationService(Settings.builder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 10) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(),5) - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 10) - .put("cluster.routing.allocation.awareness.attributes", "zone") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), - true) - .build()); + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 5, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 10, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey(), true) + ); logger.info("Building initial routing table for 'testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload'"); @@ -1013,11 +825,7 @@ public void testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload() { .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); logger.info("--> adding two nodes on same rack and do rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", singletonMap("zone", "zone1"))) - .add(newNode("node2", singletonMap("zone", "zone1"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = addNodes(clusterState, strategy, "zone1", "node1","node2"); //skewness limit doesn't apply to primary assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); @@ -1038,11 +846,7 @@ public void testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload() { assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable())); //add the third and fourth node - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node3", singletonMap("zone", "zone1"))) - .add(newNode("node4", singletonMap("zone", "zone1"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = addNodes(clusterState, strategy, "zone1", "node3", "node4"); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(18)); @@ -1056,10 +860,7 @@ public void testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload() { assertFalse(shard.primary()); } - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node5", singletonMap("zone", "zone1"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = addNodes(clusterState, strategy, "zone1", "node5"); while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { clusterState = startInitializingShardsAndReroute(strategy, clusterState); @@ -1075,15 +876,9 @@ public void testSingleZoneOneReplicaLimitsReplicaAllocationOnOverload() { } public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure(){ - AllocationService strategy = createAllocationService(Settings.builder() - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) - .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(),15) - .put("cluster.routing.allocation.awareness.attributes", "zone") - .put(NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20) - .build()); + AllocationService strategy = createAllocationServiceWithAdditionalSettings(org.opensearch.common.collect.Map.of( + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey(), 15, + NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(), 20)); logger.info("Building initial routing table for 'testThreeZoneTwoReplicaLimitsUnderFullZoneFailure'"); @@ -1099,13 +894,7 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() .getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build(); logger.info("--> adding five nodes on same zone and do rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() - .add(newNode("node1", singletonMap("zone", "zone1"))) - .add(newNode("node2", singletonMap("zone", "zone1"))) - .add(newNode("node3", singletonMap("zone", "zone1"))) - .add(newNode("node4", singletonMap("zone", "zone1"))) - .add(newNode("node5", singletonMap("zone", "zone1"))) - ).build(); + clusterState = addNodes(clusterState, strategy, "zone1", "node1", "node2", "node3", "node4", "node5"); clusterState = strategy.reroute(clusterState, "reroute"); assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(20)); @@ -1113,26 +902,13 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() clusterState = startInitializingShardsAndReroute(strategy, clusterState); logger.info("--> add five new node in new zone and reroute"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node6", singletonMap("zone", "zone2"))) - .add(newNode("node7", singletonMap("zone", "zone2"))) - .add(newNode("node8", singletonMap("zone", "zone2"))) - .add(newNode("node9", singletonMap("zone", "zone2"))) - .add(newNode("node10", singletonMap("zone", "zone2"))) - ).build(); - clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = addNodes(clusterState, strategy, "zone2", "node6", "node7", "node8", "node9", "node10"); logger.info("--> complete relocation"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node11", singletonMap("zone", "zone3"))) - .add(newNode("node12", singletonMap("zone", "zone3"))) - .add(newNode("node13", singletonMap("zone", "zone3"))) - .add(newNode("node14", singletonMap("zone", "zone3"))) - .add(newNode("node15", singletonMap("zone", "zone3"))) - ).build(); - ClusterState newState = strategy.reroute(clusterState, "reroute"); + ClusterState newState = addNodes(clusterState, strategy, "zone3", "node11", "node12", "node13", "node14", "node15"); + while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { newState = startInitializingShardsAndReroute(strategy, newState); } @@ -1146,12 +922,7 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() assertThat(newState.getRoutingNodes().node("node15").size(), equalTo(4)); logger.info("--> Remove complete zone3 holding primary and replicas"); - newState = removeNode(newState, "node11", strategy); - newState = removeNode(newState, "node12", strategy); - newState = removeNode(newState, "node13", strategy); - newState = removeNode(newState, "node14", strategy); - newState = removeNode(newState, "node15", strategy); - newState = strategy.reroute(newState, "reroute"); + newState = removeNodes(newState, strategy, "node11", "node12", "node13", "node14", "node15"); while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) { newState = startInitializingShardsAndReroute(strategy, newState); @@ -1213,8 +984,47 @@ public void testThreeZoneTwoReplicaLimitsReplicaAllocationUnderFullZoneFailure() assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); } - private ClusterState removeNode(ClusterState clusterState, String nodeName, AllocationService allocationService) { + private ClusterState removeNodes(ClusterState clusterState, AllocationService allocationService, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.getNodes()); + org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.remove(nodeId)); return allocationService.disassociateDeadNodes(ClusterState.builder(clusterState) - .nodes(DiscoveryNodes.builder(clusterState.getNodes()).remove(nodeName)).build(), true, "reroute"); + .nodes(nodeBuilder).build(), true, "reroute"); + } + + private ClusterState addNodes(ClusterState clusterState, AllocationService allocationService, String zone, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newNode(nodeId, singletonMap("zone", zone)))); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return allocationService.reroute(clusterState, "reroute"); + } + + private AllocationService createAllocationServiceWithAdditionalSettings(Map settingsValue) { + return createAllocationService(buildSettings(settingsValue)); + } + + private AllocationService createAllocationServiceWithAdditionalSettings(Map settingsValue, + GatewayAllocator gatewayAllocator) { + return createAllocationService(buildSettings(settingsValue), gatewayAllocator); + } + + private Settings buildSettings(Map settingsValue) { + Settings.Builder settingsBuilder = Settings.builder() + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 20) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 20) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put("cluster.routing.allocation.awareness.attributes", "zone"); + settingsValue.forEach((k, v) -> { + if (v instanceof Integer) + settingsBuilder.put(k, (Integer)(v)); + else if (v instanceof Boolean) + settingsBuilder.put(k, (Boolean)(v)); + else if (v instanceof String) + settingsBuilder.put(k, (String)(v)); + else { + throw new UnsupportedOperationException("Unsupported type for key :" + k); + } + }); + return settingsBuilder.build(); } }