Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds wait_for_no_initializing_shards to cluster health API #27489

Merged
merged 6 commits into from
Nov 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.cluster.health;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActiveShardCount;
Expand All @@ -40,6 +41,7 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
private TimeValue timeout = new TimeValue(30, TimeUnit.SECONDS);
private ClusterHealthStatus waitForStatus;
private boolean waitForNoRelocatingShards = false;
private boolean waitForNoInitializingShards = false;
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
private String waitForNodes = "";
private Priority waitForEvents = null;
Expand Down Expand Up @@ -72,6 +74,9 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
if (in.readBoolean()) {
waitForEvents = Priority.readFrom(in);
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
waitForNoInitializingShards = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -101,6 +106,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true);
Priority.writeTo(waitForEvents, out);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(waitForNoInitializingShards);
}
}

@Override
Expand Down Expand Up @@ -167,6 +175,21 @@ public ClusterHealthRequest waitForNoRelocatingShards(boolean waitForNoRelocatin
return this;
}

public boolean waitForNoInitializingShards() {
return waitForNoInitializingShards;
}

/**
* Sets whether the request should wait for there to be no initializing shards before
* retrieving the cluster health status. Defaults to {@code false}, meaning the
* operation does not wait on there being no more initializing shards. Set to <code>true</code>
* to wait until the number of initializing shards in the cluster is 0.
*/
public ClusterHealthRequest waitForNoInitializingShards(boolean waitForNoInitializingShards) {
this.waitForNoInitializingShards = waitForNoInitializingShards;
return this;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ public ClusterHealthRequestBuilder setWaitForNoRelocatingShards(boolean waitForR
return this;
}

/**
* Sets whether the request should wait for there to be no initializing shards before
* retrieving the cluster health status. Defaults to <code>false</code>, meaning the
* operation does not wait on there being no more initializing shards. Set to <code>true</code>
* to wait until the number of initializing shards in the cluster is 0.
*/
public ClusterHealthRequestBuilder setWaitForNoInitializingShards(boolean waitForNoInitializingShards) {
request.waitForNoInitializingShards(waitForNoInitializingShards);
return this;
}

/**
* Sets the number of shard copies that must be active before getting the health status.
* Defaults to {@link ActiveShardCount#NONE}, meaning we don't wait on any active shards.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,24 +142,26 @@ public void onFailure(String source, Exception e) {
}

private void executeHealth(final ClusterHealthRequest request, final ActionListener<ClusterHealthResponse> listener) {
int waitFor = 5;
if (request.waitForStatus() == null) {
waitFor--;
int waitFor = 0;
if (request.waitForStatus() != null) {
waitFor++;
}
if (request.waitForNoRelocatingShards() == false) {
waitFor--;
if (request.waitForNoRelocatingShards()) {
waitFor++;
}
if (request.waitForActiveShards().equals(ActiveShardCount.NONE)) {
waitFor--;
if (request.waitForNoInitializingShards()) {
waitFor++;
}
if (request.waitForNodes().isEmpty()) {
waitFor--;
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
waitFor++;
}
if (request.waitForNodes().isEmpty() == false) {
waitFor++;
}
if (request.indices() == null || request.indices().length == 0) { // check that they actually exists in the meta data
waitFor--;
if (request.indices() != null && request.indices().length > 0) { // check that they actually exists in the meta data
waitFor++;
}

assert waitFor >= 0;
final ClusterState state = clusterService.state();
final ClusterStateObserver observer = new ClusterStateObserver(state, clusterService, null, logger, threadPool.getThreadContext());
if (request.timeout().millis() == 0) {
Expand Down Expand Up @@ -196,13 +198,15 @@ public void onTimeout(TimeValue timeout) {
private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
return prepareResponse(request, response, clusterState, waitFor);
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
return readyCounter == waitFor;
}

private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
boolean valid = prepareResponse(request, response, clusterState, waitFor);
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
boolean valid = (readyCounter == waitFor);
assert valid || timedOut;
// we check for a timeout here since this method might be called from the wait_for_events
// response handler which might have timed out already.
Expand All @@ -213,14 +217,18 @@ private ClusterHealthResponse getResponse(final ClusterHealthRequest request, Cl
return response;
}

private boolean prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response, ClusterState clusterState, final int waitFor) {
static int prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response,
final ClusterState clusterState, final IndexNameExpressionResolver indexNameExpressionResolver) {
int waitForCounter = 0;
if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) {
waitForCounter++;
}
if (request.waitForNoRelocatingShards() && response.getRelocatingShards() == 0) {
waitForCounter++;
}
if (request.waitForNoInitializingShards() && response.getInitializingShards() == 0) {
waitForCounter++;
}
if (request.waitForActiveShards().equals(ActiveShardCount.NONE) == false) {
ActiveShardCount waitForActiveShards = request.waitForActiveShards();
assert waitForActiveShards.equals(ActiveShardCount.DEFAULT) == false :
Expand Down Expand Up @@ -292,7 +300,7 @@ private boolean prepareResponse(final ClusterHealthRequest request, final Cluste
}
}
}
return waitForCounter == waitFor;
return waitForCounter;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase(Locale.ROOT)));
}
clusterHealthRequest.waitForNoRelocatingShards(
request.paramAsBoolean("wait_for_no_relocating_shards", clusterHealthRequest.waitForNoRelocatingShards()));
request.paramAsBoolean("wait_for_no_relocating_shards", clusterHealthRequest.waitForNoRelocatingShards()));
clusterHealthRequest.waitForNoInitializingShards(
request.paramAsBoolean("wait_for_no_initializing_shards", clusterHealthRequest.waitForNoRelocatingShards()));
if (request.hasParam("wait_for_relocating_shards")) {
// wait_for_relocating_shards has been removed in favor of wait_for_no_relocating_shards
throw new IllegalArgumentException("wait_for_relocating_shards has been removed, " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.health;

import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.core.IsEqual.equalTo;

public class ClusterHealthRequestTests extends ESTestCase {
public void testSerialize() throws Exception {
final ClusterHealthRequest originalRequest = randomRequest();
final ClusterHealthRequest cloneRequest;
try (BytesStreamOutput out = new BytesStreamOutput()) {
originalRequest.writeTo(out);
try (StreamInput in = out.bytes().streamInput()) {
cloneRequest = new ClusterHealthRequest(in);
}
}
assertThat(cloneRequest.waitForStatus(), equalTo(originalRequest.waitForStatus()));
assertThat(cloneRequest.waitForNodes(), equalTo(originalRequest.waitForNodes()));
assertThat(cloneRequest.waitForNoInitializingShards(), equalTo(originalRequest.waitForNoInitializingShards()));
assertThat(cloneRequest.waitForNoRelocatingShards(), equalTo(originalRequest.waitForNoRelocatingShards()));
assertThat(cloneRequest.waitForActiveShards(), equalTo(originalRequest.waitForActiveShards()));
assertThat(cloneRequest.waitForEvents(), equalTo(originalRequest.waitForEvents()));
}

ClusterHealthRequest randomRequest() {
ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForStatus(randomFrom(ClusterHealthStatus.values()));
request.waitForNodes(randomFrom("", "<", "<=", ">", ">=") + between(0, 1000));
request.waitForNoInitializingShards(randomBoolean());
request.waitForNoRelocatingShards(randomBoolean());
request.waitForActiveShards(randomIntBetween(0, 10));
request.waitForEvents(randomFrom(Priority.values()));
return request;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.cluster.health;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;

import static org.hamcrest.core.IsEqual.equalTo;

public class TransportClusterHealthActionTests extends ESTestCase {

public void testWaitForInitializingShards() throws Exception {
final String[] indices = {"test"};
final ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForNoInitializingShards(true);
ClusterState clusterState = randomClusterStateWithInitializingShards("test", 0);
ClusterHealthResponse response = new ClusterHealthResponse("", indices, clusterState);
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(1));

request.waitForNoInitializingShards(true);
clusterState = randomClusterStateWithInitializingShards("test", between(1, 10));
response = new ClusterHealthResponse("", indices, clusterState);
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));

request.waitForNoInitializingShards(false);
clusterState = randomClusterStateWithInitializingShards("test", randomInt(20));
response = new ClusterHealthResponse("", indices, clusterState);
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));
}

ClusterState randomClusterStateWithInitializingShards(String index, final int initializingShards) {
final IndexMetaData indexMetaData = IndexMetaData
.builder(index)
.settings(settings(Version.CURRENT))
.numberOfShards(between(1, 10))
.numberOfReplicas(randomInt(20))
.build();

final List<ShardRoutingState> shardRoutingStates = new ArrayList<>();
IntStream.range(0, between(1, 30)).forEach(i -> shardRoutingStates.add(randomFrom(
ShardRoutingState.STARTED, ShardRoutingState.UNASSIGNED, ShardRoutingState.RELOCATING)));
IntStream.range(0, initializingShards).forEach(i -> shardRoutingStates.add(ShardRoutingState.INITIALIZING));
Randomness.shuffle(shardRoutingStates);

final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
final IndexRoutingTable.Builder routingTable = new IndexRoutingTable.Builder(indexMetaData.getIndex());

// Primary
{
ShardRoutingState state = shardRoutingStates.remove(0);
String node = state == ShardRoutingState.UNASSIGNED ? null : "node";
routingTable.addShard(
TestShardRouting.newShardRouting(shardId, node, "relocating", true, state)
);
}

// Replicas
for (int i = 0; i < shardRoutingStates.size(); i++) {
ShardRoutingState state = shardRoutingStates.get(i);
String node = state == ShardRoutingState.UNASSIGNED ? null : "node" + i;
routingTable.addShard(TestShardRouting.newShardRouting(shardId, node, "relocating"+i, randomBoolean(), state));
}

return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(MetaData.builder().put(indexMetaData, true))
.routingTable(RoutingTable.builder().add(routingTable.build()).build())
.build();
}
}
5 changes: 5 additions & 0 deletions docs/reference/cluster/health.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ The cluster health API accepts the following request parameters:
for the cluster to have no shard relocations. Defaults to false, which means
it will not wait for relocating shards.

`wait_for_no_initializing_shards`::
A boolean value which controls whether to wait (until the timeout provided)
for the cluster to have no shard initializations. Defaults to false, which means
it will not wait for initializing shards.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh missed that, sorry.

`wait_for_active_shards`::
A number controlling to how many active shards to wait for, `all` to wait
for all shards in the cluster to be active, or `0` to not wait. Defaults to `0`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
"type" : "boolean",
"description" : "Whether to wait until there are no relocating shards in the cluster"
},
"wait_for_no_initializing_shards": {
"type" : "boolean",
"description" : "Whether to wait until there are no initializing shards in the cluster"
},
"wait_for_status": {
"type" : "enum",
"options" : ["green","yellow","red"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,27 @@
- match: { unassigned_shards: 0 }
- gte: { number_of_pending_tasks: 0 }

---
"cluster health basic test, one index with wait for no initializing shards":
- skip:
version: " - 6.99.99"
reason: "wait_for_no_initializing_shards is introduced in 7.0.0"

- do:
indices.create:
index: test_index
wait_for_active_shards: 0
body:
settings:
index:
number_of_replicas: 0

- do:
cluster.health:
wait_for_no_initializing_shards: true

- match: { initializing_shards: 0 }

---
"cluster health levels":
- do:
Expand Down