Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle shard over allocation during partial zone/rack or independent node failures #1149

Merged
merged 10 commits into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
Expand Down Expand Up @@ -256,6 +257,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(Settings se
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings));

clusterPlugins.stream()
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation.decider;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.Setting.Property;

import java.util.function.BiPredicate;

/**
* This {@link NodeLoadAwareAllocationDecider} controls shard over-allocation
* due to node failures or otherwise on the surviving nodes. The allocation limits
* are decided by the user provisioned capacity, to determine if there were lost nodes
* <pre>
* cluster.routing.allocation.overload_awareness.provisioned_capacity: N

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please document the expectation from admin that this setting is supposed to be updated whenever the cluster is scaled up or down?

* </pre>
* <p>
* and prevent allocation on the surviving nodes of the under capacity cluster
* based on overload factor defined as a percentage by
* <pre>
* cluster.routing.allocation.load_awareness.skew_factor: X
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
* </pre>
* The total limit per node based on skew_factor doesn't limit primaries that previously
* existed on the disk as those shards are force allocated by
* {@link AllocationDeciders#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)}
* however new primaries due to index creation, snapshot restore etc can be controlled via the below settings
* <pre>
* cluster.routing.allocation.load_awareness.allow_unassigned_primaries
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
* </pre>
*/
public class NodeLoadAwareAllocationDecider extends AllocationDecider {

public static final String NAME = "load_awareness";

public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING =
Setting.intSetting("cluster.routing.allocation.load_awareness.provisioned_capacity", -1, -1,
Property.Dynamic, Property.NodeScope);
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING =
Setting.intSetting("cluster.routing.allocation.load_awareness.skew_factor", 50, -1, Property.Dynamic,
Property.NodeScope);
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING =
Setting.boolSetting("cluster.routing.allocation.load_awareness.allow_unassigned_primaries",
true, Setting.Property.Dynamic, Property.NodeScope);

private volatile int provisionedCapacity;

private volatile int skewFactor;

private volatile boolean allowUnassignedPrimaries;

private static final Logger logger = LogManager.getLogger(NodeLoadAwareAllocationDecider.class);

public NodeLoadAwareAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.skewFactor = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.get(settings);
this.provisionedCapacity = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.get(settings);
this.allowUnassignedPrimaries = CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING,
this::setSkewFactor);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
this::setProvisionedCapacity);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING,
this::setAllowUnassignedPrimaries);
}

private void setAllowUnassignedPrimaries(boolean allowUnassignedPrimaries) {
this.allowUnassignedPrimaries = allowUnassignedPrimaries;
}

private void setSkewFactor(int skewFactor) {
this.skewFactor = skewFactor;
}

private void setProvisionedCapacity(int provisionedCapacity) {
this.provisionedCapacity = provisionedCapacity;
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, (count, limit) -> count >= limit);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, (count, limit) -> count > limit);
}

private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation,
BiPredicate<Integer, Integer> decider) {
if (provisionedCapacity <= 0 || skewFactor < 0 ) {
return allocation.decision(Decision.YES, NAME,
"overload awareness allocation is not enabled, set cluster setting [%s] and cluster setting [%s] to enable it",
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING.getKey(),
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING.getKey());
}
if (shardRouting.unassigned() && shardRouting.primary() && allowUnassignedPrimaries) {
return allocation.decision(Decision.YES, NAME,
"overload allocation awareness is allowed for unassigned primaries, set cluster setting [%s] to disable it",
CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING.getKey());
Comment on lines +115 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the value add from the additional knob on allow unassigned primary here - when would a production cluster set it to false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a change in behaviour from what cluster.routing.allocation.total_shards_per_node limit offers. It doesn't allow unassigned primaries. If users want a similar behaviour as provided by this setting they can opt in for disabling this

}
Metadata metadata = allocation.metadata();
float expectedAvgShardsPerNode = (float) metadata.getTotalNumberOfShards() / provisionedCapacity;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is assuming every data node is equal - wont it need to factor in node attributes eg. hot/warm ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave it some thought. Few reasons why I think keeping it simple and supporting only homogenous deployments as the only default support for now makes sense

  1. The routing.allocation.require index settings doesn't guarantee that other shards can't get allocated to the node with the particular. So unless all indices mandatorily have a mutually exclusive routing.allocation.require settings, it's hard to make a call.
  2. The node.attr isn't strictly a deployment characteristic, unlike node.role and hence there could be different implementations possible around hot/warm
  3. The settings needed as input would make it quite hard to manage for eg there could be different node.attr for the same node and for each node key there can be different values. We would need some sort of multi-level provisioned capacity settings per attr key, per attr value

Comment on lines +120 to +121
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With node exclusion settings enabled, such as with ip or zone attribute, the expectedAvgShardsPerNode calculation can go wrong unless the provisionedCapacity is explicitly updated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provisioned capacity is what is actually provisioned and any infra changes would need to appropriately adjust it. For most cases the overload factor should be able to provide sufficient cushion

int nodeShardCount = node.numberOfOwningShards();
int limit = (int) Math.ceil(expectedAvgShardsPerNode * (1 + skewFactor / 100.0));
if (decider.test(nodeShardCount, limit)) {
logger.debug(() -> new ParameterizedMessage("Too many shards [{}] allocated to this node [{}]. Expected average shards" +
" per node [{}], overload factor [{}], node limit [{}]", nodeShardCount, node.nodeId(), expectedAvgShardsPerNode,
skewFactor, limit));
return allocation.decision(Decision.NO, NAME,
"too many shards [%d] allocated to this node, limit per node [%d] for overload factor [%s] based on capacity [%d]",
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved
nodeShardCount, limit, skewFactor, provisionedCapacity);
}
return allocation.decision(Decision.YES, NAME, "node meets all skew awareness attribute requirements");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.logging.log4j.LogManager;
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
Expand Down Expand Up @@ -580,7 +581,10 @@ public void apply(Settings value, Settings current, Settings previous) {
FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,
TransportMainAction.OVERRIDE_MAIN_RESPONSE_VERSION,
IndexingPressure.MAX_INDEXING_BYTES)));
IndexingPressure.MAX_INDEXING_BYTES,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_PROVISIONED_CAPACITY_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_SKEW_FACTOR_SETTING,
NodeLoadAwareAllocationDecider.CLUSTER_ROUTING_ALLOCATION_LOAD_AWARENESS_ALLOW_UNASSIGNED_PRIMARIES_SETTING)));

public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
SniffConnectionStrategy.SEARCH_REMOTE_CLUSTER_SEEDS_UPGRADER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
Expand Down Expand Up @@ -230,7 +231,8 @@ public void testAllocationDeciderOrder() {
DiskThresholdDecider.class,
ThrottlingAllocationDecider.class,
ShardsLimitAllocationDecider.class,
AwarenessAllocationDecider.class);
AwarenessAllocationDecider.class,
NodeLoadAwareAllocationDecider.class);
Collection<AllocationDecider> deciders = ClusterModule.createAllocationDeciders(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), Collections.emptyList());
Iterator<AllocationDecider> iter = deciders.iterator();
Expand Down
Loading