Skip to content

Commit

Permalink
Initial changes to handle skewness
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
Bukhtawar committed Aug 24, 2021
1 parent 5ae0045 commit fdd86f6
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@

package org.opensearch.cluster.routing.allocation.decider;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import com.carrotsearch.hppc.ObjectIntHashMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
Expand All @@ -47,6 +52,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.set.Sets;

import static java.util.Collections.emptyList;

Expand Down Expand Up @@ -99,17 +105,41 @@ public class AwarenessAllocationDecider extends AllocationDecider {
Property.NodeScope);
public static final Setting<Settings> CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING =
Setting.groupSetting("cluster.routing.allocation.awareness.force.", Property.Dynamic, Property.NodeScope);
public static final Setting<Settings> CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_CAPACITY_GROUP_SETTING =
Setting.groupSetting("cluster.routing.allocation.awareness.attribute.", Property.Dynamic, Property.NodeScope);
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCED_ALLOCATION_DISABLE_SETTING =
Setting.boolSetting("cluster.routing.allocation.awareness.forced_allocation.disable", false,
Property.Dynamic, Property.NodeScope);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_AWARENESS_SKEWNESS_LIMIT =
Setting.intSetting("cluster.routing.allocation.awareness.skewness.limit", 10, Property.Dynamic, Property.NodeScope);

private volatile List<String> awarenessAttributes;

private volatile Map<String, List<String>> forcedAwarenessAttributes;

private volatile Map<String, Integer> awarenessAttributeCapacities;

private volatile boolean disableForcedAllocation;

private volatile int skewnessLimit;

private static final Logger logger = LogManager.getLogger(AwarenessAllocationDecider.class);

public AwarenessAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.awarenessAttributes = CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.get(settings);
this.disableForcedAllocation = CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCED_ALLOCATION_DISABLE_SETTING.get(settings);
this.skewnessLimit = CLUSTER_ROUTING_ALLOCATION_AWARENESS_SKEWNESS_LIMIT.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_SKEWNESS_LIMIT,
this::setSkewnessLimit);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, this::setAwarenessAttributes);
setForcedAwarenessAttributes(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.get(settings));
setAwarenessAttributeCapacities(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_CAPACITY_GROUP_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
this::setForcedAwarenessAttributes);
this::setForcedAwarenessAttributes);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_CAPACITY_GROUP_SETTING,
this::setAwarenessAttributeCapacities);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCED_ALLOCATION_DISABLE_SETTING,
this::setDisableForcedAllocation);
}

private void setForcedAwarenessAttributes(Settings forceSettings) {
Expand All @@ -124,10 +154,28 @@ private void setForcedAwarenessAttributes(Settings forceSettings) {
this.forcedAwarenessAttributes = forcedAwarenessAttributes;
}

private void setSkewnessLimit(int skewnessLimit) {
this.skewnessLimit = skewnessLimit;
}

private void setAwarenessAttributeCapacities(Settings awarenessCapacitySettings) {
Map<String, Integer> groupCapacity = new HashMap<>();
Map<String, Settings> forceGroups = awarenessCapacitySettings.getAsGroups();
for (Map.Entry<String, Settings> entry : forceGroups.entrySet()) {
Integer capacity = entry.getValue().getAsInt("capacity", -1);
groupCapacity.put(entry.getKey(), capacity);
}
this.awarenessAttributeCapacities = groupCapacity;
}

private void setAwarenessAttributes(List<String> awarenessAttributes) {
this.awarenessAttributes = awarenessAttributes;
}

private void setDisableForcedAllocation(boolean disableForcedAllocation) {
this.disableForcedAllocation = disableForcedAllocation;
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, true);
Expand Down Expand Up @@ -159,6 +207,29 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
// build attr_value -> nodes map
ObjectIntHashMap<String> nodesPerAttribute = allocation.routingNodes().nodesPerAttributesCounts(awarenessAttribute);

if (disableForcedAllocation) {
//the current node attribute value under consideration
String nodeAttributeValue = node.node().getAttributes().get(awarenessAttribute);
Set<String> skewedAttributeValues = null;
try {
skewedAttributeValues = skewedNodesPerAttributeValue(nodesPerAttribute, awarenessAttribute);
} catch (IllegalStateException e) {
logger.warn(() -> new ParameterizedMessage("Inconsistent configuration to decide on skewness for attribute " +
"[{}] due to ", awarenessAttribute) , e);
}
if (skewedAttributeValues != null && skewedAttributeValues.contains(nodeAttributeValue)) {
//the current attribute value has nodes that are skewed
return allocation.decision(Decision.NO, NAME,
"there are too many copies of the shard allocated to nodes with attribute [%s], due to skewed distribution of " +
"nodes for attribute value [%s] expected the nodes for this attribute to be [%d] but found nodes per " +
"attribute to be [%d]",
awarenessAttribute,
nodeAttributeValue,
awarenessAttributeCapacities.get(awarenessAttribute),
nodesPerAttribute.get(awarenessAttribute));
}
}

// build the count of shards per attribute value
ObjectIntHashMap<String> shardPerAttribute = new ObjectIntHashMap<>();
for (ShardRouting assignedShard : allocation.routingNodes().assignedShards(shardRouting.shardId())) {
Expand All @@ -176,7 +247,7 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
if (node.nodeId().equals(nodeId) == false) {
// we work on different nodes, move counts around
shardPerAttribute.putOrAdd(allocation.routingNodes().node(nodeId).node().getAttributes().get(awarenessAttribute),
0, -1);
0, -1);
shardPerAttribute.addTo(node.node().getAttributes().get(awarenessAttribute), 1);
}
} else {
Expand All @@ -199,17 +270,56 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
final int maximumNodeCount = (shardCount + numberOfAttributes - 1) / numberOfAttributes; // ceil(shardCount/numberOfAttributes)
if (currentNodeCount > maximumNodeCount) {
return allocation.decision(Decision.NO, NAME,
"there are too many copies of the shard allocated to nodes with attribute [%s], there are [%d] total configured " +
"there are too many copies of the shard allocated to nodes with attribute [%s], there are [%d] total configured " +
"shard copies for this shard id and [%d] total attribute values, expected the allocated shard count per " +
"attribute [%d] to be less than or equal to the upper bound of the required number of shards per attribute [%d]",
awarenessAttribute,
shardCount,
numberOfAttributes,
currentNodeCount,
maximumNodeCount);
awarenessAttribute,
shardCount,
numberOfAttributes,
currentNodeCount,
maximumNodeCount);
}
}

return allocation.decision(Decision.YES, NAME, "node meets all awareness attribute requirements");
}

private Set<String> skewedNodesPerAttributeValue(ObjectIntHashMap<String> nodesPerAttribute, String awarenessAttribute) {
Set<String> underCapacityAttributeValues = null;
int capacity = awarenessAttributeCapacities.getOrDefault(awarenessAttribute, -1);
if (forcedAwarenessAttributes.containsKey(awarenessAttribute) == false || capacity <= 0) {
// forced awareness is not enabled for this attribute
return Collections.emptySet();
}
List<String> forcedAwarenessAttribute = forcedAwarenessAttributes.get(awarenessAttribute);
if (forcedAwarenessAttribute.size() > nodesPerAttribute.size()) {
//we have a complete attribute failures
return Collections.emptySet();
} else if (forcedAwarenessAttribute.size() == nodesPerAttribute.size()) {
int minimumNodesBeforeSkewness = (int) Math.ceil((1 - skewnessLimit / 100.0) * capacity);
for (String attributeValue : forcedAwarenessAttribute) {
if (nodesPerAttribute.containsKey(attributeValue) == false) {
//forced attribute values and discovery nodes have a mismatch
throw new IllegalStateException("Missing attribute value in discovered nodes:" + attributeValue);
} else if (nodesPerAttribute.get(attributeValue) < minimumNodesBeforeSkewness) {
if (underCapacityAttributeValues == null) {
underCapacityAttributeValues = Sets.newHashSet(attributeValue);
} else {
underCapacityAttributeValues.add(attributeValue);
}
} else if (nodesPerAttribute.get(attributeValue) > capacity) {
throw new IllegalStateException("Unexpected capacity for attribute value :" + attributeValue + "expected : " + capacity
+ "found :" + nodesPerAttribute.get(attributeValue));
}
}
if (underCapacityAttributeValues != null && underCapacityAttributeValues.size() == forcedAwarenessAttribute.size()
&& forcedAwarenessAttribute.size() != 1) {
throw new IllegalStateException("Unexpected capacity for attribute :" + awarenessAttribute + "capacity" + capacity);
}
} else {
throw new IllegalStateException("Mismatch between forced awareness attribute :" + forcedAwarenessAttributes
+ "and discovered nodes " + nodesPerAttribute);
}
return underCapacityAttributeValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ public void apply(Settings value, Settings current, Settings previous) {
TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME,
TransportClient.CLIENT_TRANSPORT_SNIFF,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_CAPACITY_GROUP_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_SKEWNESS_LIMIT,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCED_ALLOCATION_DISABLE_SETTING,
BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING,
BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
Expand Down Expand Up @@ -901,4 +904,101 @@ public void testMultipleAwarenessAttributes() {
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
}

public void testDisabledForcedAllocationPreventsOverload() {
AllocationService strategy = createAllocationService(Settings.builder()
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 21)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 21)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 21)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCED_ALLOCATION_DISABLE_SETTING.getKey(), true)
.put("cluster.routing.allocation.awareness.attribute.zone.capacity", 3)
.put("cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2,zone_3")
.put("cluster.routing.allocation.awareness.attributes", "zone")
.build());

logger.info("Building initial routing table for 'fullAwareness1'");

Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(21).numberOfReplicas(2))
.build();

RoutingTable initialRoutingTable = RoutingTable.builder()
.addAsNew(metadata.index("test"))
.build();

ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING
.getDefault(Settings.EMPTY)).metadata(metadata).routingTable(initialRoutingTable).build();

logger.info("--> adding three nodes on same zone and do rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.add(newNode("node1", singletonMap("zone", "zone_1")))
.add(newNode("node2", singletonMap("zone", "zone_1")))
.add(newNode("node3", singletonMap("zone", "zone_1")))
).build();
clusterState = strategy.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(21));

logger.info("--> start the shards (primaries)");
clusterState = startInitializingShardsAndReroute(strategy, clusterState);

logger.info("--> replica will not start because we have only one rack value");
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(21));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));

logger.info("--> add three new node with a new rack and reroute");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.add(newNode("node4", singletonMap("zone", "zone_2")))
.add(newNode("node5", singletonMap("zone", "zone_2")))
.add(newNode("node6", singletonMap("zone", "zone_2")))
).build();
clusterState = strategy.reroute(clusterState, "reroute");

assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(21));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(21));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
equalTo("node4"));

logger.info("--> complete relocation");
clusterState = startInitializingShardsAndReroute(strategy, clusterState);

assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(42));

logger.info("--> do another reroute, make sure nothing moves");
assertThat(strategy.reroute(clusterState, "reroute").routingTable(), sameInstance(clusterState.routingTable()));

logger.info("--> add another node with a new rack, make sure nothing moves");

clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.add(newNode("node7", singletonMap("zone", "zone_3")))
.add(newNode("node8", singletonMap("zone", "zone_3")))
.add(newNode("node9", singletonMap("zone", "zone_3")))
).build();
ClusterState newState = strategy.reroute(clusterState, "reroute");
while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
newState = startInitializingShardsAndReroute(strategy, newState);
}
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(63));

logger.info("--> Remove random node from zones holding all primary and all replicas");
//remove two nodes in one zone to cause distribution zone1->3 , zone2->3, zone3->1
newState = removeNode(newState, randomFrom("node1", "node7" ), strategy);
logger.info("--> Remove another random node from zones holding all primary and all replicas");
newState = removeNode(newState, randomFrom("node2", "node8" ), strategy);
newState = strategy.reroute(newState, "reroute");
while (newState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false) {
newState = startInitializingShardsAndReroute(strategy, newState);
}
//ensure minority zone doesn't get overloaded
assertThat(newState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(49));
assertThat(newState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(14));
for (ShardRouting shard : newState.getRoutingNodes().shardsWithState(UNASSIGNED)) {
assertEquals(shard.unassignedInfo().getReason(), UnassignedInfo.Reason.NODE_LEFT);
}
}

private ClusterState removeNode(ClusterState clusterState, String nodeName, AllocationService allocationService) {
return allocationService.disassociateDeadNodes(ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder(clusterState.getNodes()).remove(nodeName)).build(), true, "reroute");
}
}

0 comments on commit fdd86f6

Please sign in to comment.