-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
ShardLimitValidator.java
181 lines (161 loc) · 7.67 KB
/
ShardLimitValidator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.indices;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.ValidationException;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.Index;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
/**
* This class contains the logic used to check the cluster-wide shard limit before shards are created and ensuring that the limit is
* updated correctly on setting updates, etc.
*
* NOTE: This is the limit applied at *shard creation time*. If you are looking for the limit applied at *allocation* time, which is
* controlled by a different setting,
* see {@link org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider}.
*
* @opensearch.internal
*/
public class ShardLimitValidator {
public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_NODE = Setting.intSetting(
"cluster.max_shards_per_node",
1000,
1,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
protected final AtomicInteger shardLimitPerNode = new AtomicInteger();
private final SystemIndices systemIndices;
public ShardLimitValidator(final Settings settings, ClusterService clusterService, SystemIndices systemIndices) {
this.shardLimitPerNode.set(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(settings));
clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE, this::setShardLimitPerNode);
this.systemIndices = systemIndices;
}
private void setShardLimitPerNode(int newValue) {
this.shardLimitPerNode.set(newValue);
}
/**
* Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_SHARDS_PER_NODE} setting.
* @return the current value of the setting
*/
public int getShardLimitPerNode() {
return shardLimitPerNode.get();
}
/**
* Checks whether an index can be created without going over the cluster shard limit.
*
* @param indexName the name of the index being created
* @param settings the settings of the index to be created
* @param state the current cluster state
* @throws ValidationException if creating this index would put the cluster over the cluster shard limit
*/
public void validateShardLimit(final String indexName, final Settings settings, final ClusterState state) {
// Validate shard limit only for non system indices as it is not hard limit anyways
if (systemIndices.validateSystemIndex(indexName)) {
return;
}
final int numberOfShards = INDEX_NUMBER_OF_SHARDS_SETTING.get(settings);
final int numberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);
final int shardsToCreate = numberOfShards * (1 + numberOfReplicas);
final Optional<String> shardLimit = checkShardLimit(shardsToCreate, state);
if (shardLimit.isPresent()) {
final ValidationException e = new ValidationException();
e.addValidationError(shardLimit.get());
throw e;
}
}
/**
* Validates whether a list of indices can be opened without going over the cluster shard limit. Only counts indices which are
* currently closed and will be opened, ignores indices which are already open.
*
* @param currentState The current cluster state.
* @param indicesToOpen The indices which are to be opened.
* @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled.
*/
public void validateShardLimit(ClusterState currentState, Index[] indicesToOpen) {
int shardsToOpen = Arrays.stream(indicesToOpen)
// Validate shard limit only for non system indices as it is not hard limit anyways
.filter(index -> !systemIndices.validateSystemIndex(index.getName()))
.filter(index -> currentState.metadata().index(index).getState().equals(IndexMetadata.State.CLOSE))
.mapToInt(index -> getTotalShardCount(currentState, index))
.sum();
Optional<String> error = checkShardLimit(shardsToOpen, currentState);
if (error.isPresent()) {
ValidationException ex = new ValidationException();
ex.addValidationError(error.get());
throw ex;
}
}
private static int getTotalShardCount(ClusterState state, Index index) {
IndexMetadata indexMetadata = state.metadata().index(index);
return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas());
}
/**
* Checks to see if an operation can be performed without taking the cluster over the cluster-wide shard limit.
* Returns an error message if appropriate, or an empty {@link Optional} otherwise.
*
* @param newShards The number of shards to be added by this operation
* @param state The current cluster state
* @return If present, an error message to be given as the reason for failing
* an operation. If empty, a sign that the operation is valid.
*/
public Optional<String> checkShardLimit(int newShards, ClusterState state) {
return checkShardLimit(newShards, state, getShardLimitPerNode());
}
// package-private for testing
static Optional<String> checkShardLimit(int newShards, ClusterState state, int maxShardsPerNodeSetting) {
int nodeCount = state.getNodes().getDataNodes().size();
// Only enforce the shard limit if we have at least one data node, so that we don't block
// index creation during cluster setup
if (nodeCount == 0 || newShards < 0) {
return Optional.empty();
}
int maxShardsPerNode = maxShardsPerNodeSetting;
int maxShardsInCluster = maxShardsPerNode * nodeCount;
int currentOpenShards = state.getMetadata().getTotalOpenIndexShards();
if ((currentOpenShards + newShards) > maxShardsInCluster) {
String errorMessage = "this action would add ["
+ newShards
+ "] total shards, but this cluster currently has ["
+ currentOpenShards
+ "]/["
+ maxShardsInCluster
+ "] maximum shards open";
return Optional.of(errorMessage);
}
return Optional.empty();
}
}