Skip to content

Commit

Permalink
Handle shard over allocation during partial zone/rack or independent …
Browse files Browse the repository at this point in the history
…node failures (opensearch-project#1149)

The changes ensure that in the event of a partial zone failure, the surviving nodes in the minority zone don't get overloaded with shards, this is governed by a skewness limit.

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
Bukhtawar committed Sep 22, 2021
1 parent 7135eaf commit 43052f4
Show file tree
Hide file tree
Showing 5 changed files with 1,173 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
Expand Down Expand Up @@ -256,6 +257,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(Settings se
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings));

clusterPlugins.stream()
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation.decider;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.Setting.Property;

import java.util.function.BiPredicate;

/**
* This {@link NodeLoadAwareAllocationDecider} controls shard over-allocation
* due to node failures or otherwise on the surviving nodes. The allocation limits
* are decided by the user provisioned capacity, to determine if there were lost nodes.
* The provisioned capacity as defined by the below settings needs to be updated on every
* cluster scale up and scale down operations.
* <pre>
* cluster.routing.allocation.overload_awareness.provisioned_capacity: N
* </pre>
* <p>
* and prevent allocation on the surviving nodes of the under capacity cluster
* based on overload factor defined as a percentage by
* <pre>
* cluster.routing.allocation.load_awareness.skew_factor: X
* </pre>
* The total limit per node based on skew_factor doesn't limit primaries that previously
* existed on the disk as those shards are force allocated by
* {@link AllocationDeciders#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)}
* however new primaries due to index creation, snapshot restore etc can be controlled via the below settings.
* Setting the value to true allows newly created primaries to get assigned while preventing the replica allocation
* breaching the skew factor.
* Note that setting this to false can result in the primaries not get assigned and the cluster turning RED
* <pre>
* cluster.routing.allocation.load_awareness.allow_unassigned_primaries
* </pre>
*/
public class NodeLoadAwareAllocationDecider extends AllocationDecider {

public static final String NAME = "load_awareness";

public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING =
Setting.intSetting("cluster.routing.allocation.load_awareness.provisioned_capacity", -1, -1,
Property.Dynamic, Property.NodeScope);
public static final Setting<Double> CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING =
Setting.doubleSetting("cluster.routing.allocation.load_awareness.skew_factor", 50, -1, Property.Dynamic,
Property.NodeScope);
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING =
Setting.boolSetting("cluster.routing.allocation.load_awareness.allow_unassigned_primaries",
true, Setting.Property.Dynamic, Property.NodeScope);

private volatile int provisionedCapacity;

private volatile double skewFactor;

private volatile boolean allowUnassignedPrimaries;

private static final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationDecider.class);

public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.skewFactor = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.get(settings);
this.provisionedCapacity = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.get(settings);
this.allowUnassignedPrimaries = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING,
this::setSkewFactor);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
this::setProvisionedCapacity);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING,
this::setAllowUnassignedPrimaries);
}

private void setAllowUnassignedPrimaries(boolean allowUnassignedPrimaries) {
this.allowUnassignedPrimaries = allowUnassignedPrimaries;
}

private void setSkewFactor(double skewFactor) {
this.skewFactor = skewFactor;
}

private void setProvisionedCapacity(int provisionedCapacity) {
this.provisionedCapacity = provisionedCapacity;
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, (count, limit) -> count >= limit);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, (count, limit) -> count > limit);
}

private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation,
BiPredicate<Integer, Integer> decider) {
if (provisionedCapacity <= 0 || skewFactor < 0 ) {
return allocation.decision(Decision.YES, NAME,
"overload awareness allocation is not enabled, set cluster setting [%s] and cluster setting [%s] to enable it",
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(),
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey());
}
if (shardRouting.unassigned() && shardRouting.primary() && allowUnassignedPrimaries) {
return allocation.decision(Decision.YES, NAME,
"overload allocation awareness is allowed for unassigned primaries, set cluster setting [%s] to disable it",
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey());
}
Metadata metadata = allocation.metadata();
float expectedAvgShardsPerNode = (float) metadata.getTotalNumberOfShards() / provisionedCapacity;
int nodeShardCount = node.numberOfOwningShards();
int limit = (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0));
if (decider.test(nodeShardCount, limit)) {
logger.debug(() -> new ParameterizedMessage("Too many shards [{}] allocated to this node [{}]. Expected average shards" +
" per node [{}], overload factor [{}], node limit [{}]", nodeShardCount, node.nodeId(), expectedAvgShardsPerNode,
skewFactor, limit));
return allocation.decision(Decision.NO, NAME, "too many shards [%d] allocated to this node, limit per node [%d] considering" +
" overload factor [%.2f] based on capacity [%d]", nodeShardCount, limit, skewFactor, provisionedCapacity);
}
return allocation.decision(Decision.YES, NAME, "node meets all skew awareness attribute requirements");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.logging.log4j.LogManager;
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
Expand Down Expand Up @@ -580,7 +581,10 @@ public void apply(Settings value, Settings current, Settings previous) {
FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
TransportMainAction.OVERRIDE_MAIN_RESPONSE_VERSION,
IndexingPressure.MAX_INDEXING_BYTES)));
IndexingPressure.MAX_INDEXING_BYTES,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING)));

public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
Expand Down Expand Up @@ -230,7 +231,8 @@ public void testAllocationDeciderOrder() {
DiskThresholdDecider.class,
ThrottlingAllocationDecider.class,
ShardsLimitAllocationDecider.class,
AwarenessAllocationDecider.class);
AwarenessAllocationDecider.class,
NodeLoadAwareAllocationDecider.class);
Collection<AllocationDecider> deciders = ClusterModule.createAllocationDeciders(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), Collections.emptyList());
Iterator<AllocationDecider> iter = deciders.iterator();
Expand Down
Loading

0 comments on commit 43052f4

Please sign in to comment.