From 3f9993dc262b50e550f91042ca2416bbb28e5dcb Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 26 Feb 2019 18:34:03 +0100 Subject: [PATCH] Wait for shards to be active after closing indices (#38854) This commit changes the Close Index API to add a `wait_for_active_shards` parameter that allows to wait for shards of closed indices to be active before returning a response. Relates #33888 --- .../rest-api-spec/api/indices.close.json | 4 + .../test/indices.open/10_basic.yml | 24 ++++ .../test/indices.open/20_multiple_indices.yml | 6 + .../admin/indices/close/CloseIndexAction.java | 7 +- .../CloseIndexClusterStateUpdateRequest.java | 18 ++- .../indices/close/CloseIndexRequest.java | 20 +++ .../close/CloseIndexRequestBuilder.java | 31 ++++- .../indices/close/CloseIndexResponse.java | 52 ++++++++ .../close/TransportCloseIndexAction.java | 27 +++-- .../client/IndicesAdminClient.java | 5 +- .../client/support/AbstractClient.java | 5 +- .../metadata/MetaDataIndexStateService.java | 29 ++++- .../admin/indices/RestCloseIndexAction.java | 5 + .../indices/close/CloseIndexRequestTests.java | 114 ++++++++++++++++++ .../close/CloseIndexResponseTests.java | 86 +++++++++++++ .../index/shard/IndexShardIT.java | 4 +- .../indices/IndicesLifecycleListenerIT.java | 10 +- .../indices/state/CloseIndexIT.java | 26 +++- .../indices/state/ReopenWhileClosingIT.java | 6 +- .../indices/state/SimpleIndexStateIT.java | 2 +- .../ccr/action/ShardFollowTasksExecutor.java | 3 +- .../xpack/ccr/CloseFollowerIndexIT.java | 8 +- .../action/TransportFreezeIndexAction.java | 5 +- .../CloseFollowerIndexStepTests.java | 6 +- 24 files changed, 459 insertions(+), 44 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.close.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.close.json index 4eaa93030ee7b..55fd245f26c91 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.close.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.close.json @@ -34,6 +34,10 @@ "options" : ["open","closed","none","all"], "default" : "open", "description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both." + }, + "wait_for_active_shards": { + "type" : "string", + "description" : "Sets the number of active shards to wait for before the operation returns." } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml index 64e59d5939287..a389fee9bf761 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/10_basic.yml @@ -14,6 +14,7 @@ - do: indices.close: index: test_index + - is_true: acknowledged - do: catch: bad_request @@ -24,6 +25,7 @@ - do: indices.open: index: test_index + - is_true: acknowledged - do: cluster.health: @@ -50,11 +52,33 @@ - do: indices.close: index: test_index + - is_true: acknowledged - do: indices.open: index: test_index wait_for_active_shards: all + - is_true: acknowledged + - match: { acknowledged: true } + - match: { shards_acknowledged: true } + +--- +"Close index with wait_for_active_shards set to all": + - skip: + version: " - 7.99.99" + reason: "closed indices are replicated starting version 8.0" + + - do: + indices.create: + index: test_index + body: + settings: + number_of_replicas: 0 + - do: + indices.close: + index: test_index + wait_for_active_shards: all + - is_true: acknowledged - match: { acknowledged: true } - match: { shards_acknowledged: true } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/20_multiple_indices.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/20_multiple_indices.yml index 8e1bf660f6378..bef5ea8a54651 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/20_multiple_indices.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.open/20_multiple_indices.yml @@ -26,6 +26,7 @@ setup: - do: indices.close: index: _all + - is_true: acknowledged - do: catch: bad_request @@ -36,6 +37,7 @@ setup: - do: indices.open: index: _all + - is_true: acknowledged - do: cluster.health: @@ -51,6 +53,7 @@ setup: - do: indices.close: index: test_* + - is_true: acknowledged - do: catch: bad_request @@ -61,6 +64,7 @@ setup: - do: indices.open: index: test_* + - is_true: acknowledged - do: cluster.health: @@ -76,6 +80,7 @@ setup: - do: indices.close: index: '*' + - is_true: acknowledged - do: catch: bad_request @@ -86,6 +91,7 @@ setup: - do: indices.open: index: '*' + - is_true: acknowledged - do: cluster.health: diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexAction.java index 68a911ff58627..5c3d60dd44013 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexAction.java @@ -20,9 +20,8 @@ package org.elasticsearch.action.admin.indices.close; import org.elasticsearch.action.Action; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -public class CloseIndexAction extends Action { +public class CloseIndexAction extends Action { public static final CloseIndexAction INSTANCE = new CloseIndexAction(); public static final String NAME = "indices:admin/close"; @@ -32,7 +31,7 @@ private CloseIndexAction() { } @Override - public AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + public CloseIndexResponse newResponse() { + return new CloseIndexResponse(); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java index bb0f98ac07b7e..955ddf6fe8a76 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexClusterStateUpdateRequest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action.admin.indices.close; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest; /** @@ -25,7 +26,8 @@ */ public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest { - private final long taskId; + private long taskId; + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; public CloseIndexClusterStateUpdateRequest(final long taskId) { this.taskId = taskId; @@ -34,4 +36,18 @@ public CloseIndexClusterStateUpdateRequest(final long taskId) { public long taskId() { return taskId; } + + public CloseIndexClusterStateUpdateRequest taskId(final long taskId) { + this.taskId = taskId; + return this; + } + + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } + + public CloseIndexClusterStateUpdateRequest waitForActiveShards(final ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + return this; + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java index 272bae9425712..e7979beb68214 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequest.java @@ -19,8 +19,10 @@ package org.elasticsearch.action.admin.indices.close; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.io.stream.StreamInput; @@ -38,6 +40,7 @@ public class CloseIndexRequest extends AcknowledgedRequest im private String[] indices; private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen(); + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; //NORELEASE Changes this to NONE to keep previous behavior public CloseIndexRequest() { } @@ -101,11 +104,25 @@ public CloseIndexRequest indicesOptions(IndicesOptions indicesOptions) { return this; } + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } + + public CloseIndexRequest waitForActiveShards(final ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + return this; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); indices = in.readStringArray(); indicesOptions = IndicesOptions.readIndicesOptions(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + waitForActiveShards = ActiveShardCount.readFrom(in); + } else { + waitForActiveShards = ActiveShardCount.NONE; + } } @Override @@ -113,5 +130,8 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeStringArray(indices); indicesOptions.writeIndicesOptions(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + waitForActiveShards.writeTo(out); + } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java index e69c6fed87dcd..7db79e0c3e550 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestBuilder.java @@ -19,16 +19,16 @@ package org.elasticsearch.action.admin.indices.close; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.ElasticsearchClient; /** * Builder for close index request */ public class CloseIndexRequestBuilder - extends AcknowledgedRequestBuilder { + extends AcknowledgedRequestBuilder { public CloseIndexRequestBuilder(ElasticsearchClient client, CloseIndexAction action) { super(client, action, new CloseIndexRequest()); @@ -60,4 +60,31 @@ public CloseIndexRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) request.indicesOptions(indicesOptions); return this; } + + /** + * Sets the number of shard copies that should be active for indices closing to return. + * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy + * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to + * wait for all shards (primary and all replicas) to be active before returning. + * Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Indices closing will only wait up until the timeout value for the number of shard copies + * to be active before returning. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public CloseIndexRequestBuilder setWaitForActiveShards(final ActiveShardCount waitForActiveShards) { + request.waitForActiveShards(waitForActiveShards); + return this; + } + + /** + * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public CloseIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) { + return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java new file mode 100644 index 0000000000000..189712f0fca78 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponse.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.indices.close; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class CloseIndexResponse extends ShardsAcknowledgedResponse { + + CloseIndexResponse() { + } + + public CloseIndexResponse(final boolean acknowledged, final boolean shardsAcknowledged) { + super(acknowledged, shardsAcknowledged); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + readShardsAcknowledged(in); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + writeShardsAcknowledged(out); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java index bb3db084b0c53..05f680af57ddf 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportCloseIndexAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DestructiveOperations; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -44,7 +43,7 @@ /** * Close index action */ -public class TransportCloseIndexAction extends TransportMasterNodeAction { +public class TransportCloseIndexAction extends TransportMasterNodeAction { private final MetaDataIndexStateService indexStateService; private final DestructiveOperations destructiveOperations; @@ -76,12 +75,12 @@ protected String executor() { } @Override - protected AcknowledgedResponse newResponse() { - return new AcknowledgedResponse(); + protected CloseIndexResponse newResponse() { + return new CloseIndexResponse(); } @Override - protected void doExecute(Task task, CloseIndexRequest request, ActionListener listener) { + protected void doExecute(Task task, CloseIndexRequest request, ActionListener listener) { destructiveOperations.failDestructive(request.indices()); if (closeIndexEnabled == false) { throw new IllegalStateException("closing indices is disabled - set [" + CLUSTER_INDICES_CLOSE_ENABLE_SETTING.getKey() + @@ -97,29 +96,33 @@ protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterSta } @Override - protected void masterOperation(final CloseIndexRequest request, final ClusterState state, - final ActionListener listener) { + protected void masterOperation(final CloseIndexRequest request, + final ClusterState state, + final ActionListener listener) { throw new UnsupportedOperationException("The task parameter is required"); } @Override - protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state, - final ActionListener listener) throws Exception { + protected void masterOperation(final Task task, + final CloseIndexRequest request, + final ClusterState state, + final ActionListener listener) throws Exception { final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); if (concreteIndices == null || concreteIndices.length == 0) { - listener.onResponse(new AcknowledgedResponse(true)); + listener.onResponse(new CloseIndexResponse(true, false)); return; } final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId()) .ackTimeout(request.timeout()) .masterNodeTimeout(request.masterNodeTimeout()) + .waitForActiveShards(request.waitForActiveShards()) .indices(concreteIndices); - indexStateService.closeIndices(closeRequest, new ActionListener() { + indexStateService.closeIndices(closeRequest, new ActionListener() { @Override - public void onResponse(final AcknowledgedResponse response) { + public void onResponse(final CloseIndexResponse response) { listener.onResponse(response); } diff --git a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java index 718dde98a0f97..d5a73981f29f1 100644 --- a/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/IndicesAdminClient.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequestBuilder; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -307,7 +308,7 @@ public interface IndicesAdminClient extends ElasticsearchClient { * @return The result future * @see org.elasticsearch.client.Requests#closeIndexRequest(String) */ - ActionFuture close(CloseIndexRequest request); + ActionFuture close(CloseIndexRequest request); /** * Closes an index based on the index name. @@ -316,7 +317,7 @@ public interface IndicesAdminClient extends ElasticsearchClient { * @param listener A listener to be notified with a result * @see org.elasticsearch.client.Requests#closeIndexRequest(String) */ - void close(CloseIndexRequest request, ActionListener listener); + void close(CloseIndexRequest request, ActionListener listener); /** * Closes one or more indices based on their index name. diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 3fc931a85c0f7..e79f0567babe6 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -152,6 +152,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexAction; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequestBuilder; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; @@ -1355,12 +1356,12 @@ public DeleteIndexRequestBuilder prepareDelete(String... indices) { } @Override - public ActionFuture close(final CloseIndexRequest request) { + public ActionFuture close(final CloseIndexRequest request) { return execute(CloseIndexAction.INSTANCE, request); } @Override - public void close(final CloseIndexRequest request, final ActionListener listener) { + public void close(final CloseIndexRequest request, final ActionListener listener) { execute(CloseIndexAction.INSTANCE, request, listener); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 4d81bf6e9c557..7c582483e3b42 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NotifyOnceListener; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.action.support.ActiveShardsObserver; @@ -123,7 +124,7 @@ public MetaDataIndexStateService(ClusterService clusterService, AllocationServic * Closing indices is a 3 steps process: it first adds a write block to every indices to close, then waits for the operations on shards * to be terminated and finally closes the indices by moving their state to CLOSE. */ - public void closeIndices(final CloseIndexClusterStateUpdateRequest request, final ActionListener listener) { + public void closeIndices(final CloseIndexClusterStateUpdateRequest request, final ActionListener listener) { final Index[] concreteIndices = request.indices(); if (concreteIndices == null || concreteIndices.length == 0) { throw new IllegalArgumentException("Index name is required"); @@ -143,7 +144,7 @@ public ClusterState execute(final ClusterState currentState) { public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { if (oldState == newState) { assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed"; - listener.onResponse(new AcknowledgedResponse(true)); + listener.onResponse(new CloseIndexResponse(true, false)); } else { assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; threadPool.executor(ThreadPool.Names.MANAGEMENT) @@ -174,7 +175,29 @@ public void onFailure(final String source, final Exception e) { @Override public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { - listener.onResponse(new AcknowledgedResponse(acknowledged)); + + final String[] indices = results.entrySet().stream() + .filter(result -> result.getValue().isAcknowledged()) + .map(result -> result.getKey().getName()) + .filter(index -> newState.routingTable().hasIndex(index)) + .toArray(String[]::new); + + if (indices.length > 0) { + activeShardsObserver.waitForActiveShards(indices, request.waitForActiveShards(), + request.ackTimeout(), shardsAcknowledged -> { + if (shardsAcknowledged == false) { + logger.debug("[{}] indices closed, but the operation timed out while waiting " + + "for enough shards to be started.", Arrays.toString(indices)); + } + // acknowledged maybe be false but some indices may have been correctly closed, so + // we maintain a kind of coherency by overriding the shardsAcknowledged value + // (see ShardsAcknowledgedResponse constructor) + boolean shardsAcked = acknowledged ? shardsAcknowledged : false; + listener.onResponse(new CloseIndexResponse(acknowledged, shardsAcked)); + }, listener::onFailure); + } else { + listener.onResponse(new CloseIndexResponse(acknowledged, false)); + } } }), listener::onFailure) diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCloseIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCloseIndexAction.java index b2475cafcbeb6..3ee2687eb7288 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCloseIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestCloseIndexAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.rest.action.admin.indices; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.Strings; @@ -49,6 +50,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC closeIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", closeIndexRequest.masterNodeTimeout())); closeIndexRequest.timeout(request.paramAsTime("timeout", closeIndexRequest.timeout())); closeIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, closeIndexRequest.indicesOptions())); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + closeIndexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); + } return channel -> client.admin().indices().close(closeIndexRequest, new RestToXContentListener<>(channel)); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestTests.java new file mode 100644 index 0000000000000..53b39027a697b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexRequestTests.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.close; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; + +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; + +public class CloseIndexRequestTests extends ESTestCase { + + public void testSerialization() throws Exception { + final CloseIndexRequest request = randomRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + + final CloseIndexRequest deserializedRequest = new CloseIndexRequest(); + try (StreamInput in = out.bytes().streamInput()) { + deserializedRequest.readFrom(in); + } + assertEquals(request.timeout(), deserializedRequest.timeout()); + assertEquals(request.masterNodeTimeout(), deserializedRequest.masterNodeTimeout()); + assertEquals(request.indicesOptions(), deserializedRequest.indicesOptions()); + assertEquals(request.getParentTask(), deserializedRequest.getParentTask()); + assertEquals(request.waitForActiveShards(), deserializedRequest.waitForActiveShards()); + assertArrayEquals(request.indices(), deserializedRequest.indices()); + } + } + + public void testBwcSerialization() throws Exception { + { + final CloseIndexRequest request = randomRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_8_0_0))); + request.writeTo(out); + + try (StreamInput in = out.bytes().streamInput()) { + assertEquals(request.getParentTask(), TaskId.readFromStream(in)); + assertEquals(request.masterNodeTimeout(), in.readTimeValue()); + assertEquals(request.timeout(), in.readTimeValue()); + assertArrayEquals(request.indices(), in.readStringArray()); + assertEquals(request.indicesOptions(), IndicesOptions.readIndicesOptions(in)); + } + } + } + { + final CloseIndexRequest sample = randomRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + sample.getParentTask().writeTo(out); + out.writeTimeValue(sample.masterNodeTimeout()); + out.writeTimeValue(sample.timeout()); + out.writeStringArray(sample.indices()); + sample.indicesOptions().writeIndicesOptions(out); + + final CloseIndexRequest deserializedRequest = new CloseIndexRequest(); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_8_0_0))); + deserializedRequest.readFrom(in); + } + assertEquals(sample.getParentTask(), deserializedRequest.getParentTask()); + assertEquals(sample.masterNodeTimeout(), deserializedRequest.masterNodeTimeout()); + assertEquals(sample.timeout(), deserializedRequest.timeout()); + assertArrayEquals(sample.indices(), deserializedRequest.indices()); + assertEquals(sample.indicesOptions(), deserializedRequest.indicesOptions()); + assertEquals(ActiveShardCount.NONE, deserializedRequest.waitForActiveShards()); + } + } + } + + private CloseIndexRequest randomRequest() { + CloseIndexRequest request = new CloseIndexRequest(); + request.indices(generateRandomStringArray(10, 5, false, false)); + if (randomBoolean()) { + request.indicesOptions(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + } + if (randomBoolean()) { + request.timeout(randomPositiveTimeValue()); + } + if (randomBoolean()) { + request.masterNodeTimeout(randomPositiveTimeValue()); + } + if (randomBoolean()) { + request.setParentTask(randomAlphaOfLength(5), randomNonNegativeLong()); + } + if (randomBoolean()) { + request.waitForActiveShards(randomFrom(ActiveShardCount.DEFAULT, ActiveShardCount.NONE, ActiveShardCount.ONE, + ActiveShardCount.ALL)); + } + return request; + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java new file mode 100644 index 0000000000000..dc859cfab63a9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/CloseIndexResponseTests.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.close; + +import org.elasticsearch.Version; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; + +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; +import static org.hamcrest.Matchers.equalTo; + +public class CloseIndexResponseTests extends ESTestCase { + + public void testSerialization() throws Exception { + final CloseIndexResponse response = randomResponse(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + response.writeTo(out); + + final CloseIndexResponse deserializedResponse = new CloseIndexResponse(); + try (StreamInput in = out.bytes().streamInput()) { + deserializedResponse.readFrom(in); + } + assertCloseIndexResponse(deserializedResponse, response); + } + } + + public void testBwcSerialization() throws Exception { + { + final CloseIndexResponse response = randomResponse(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_8_0_0))); + response.writeTo(out); + + final AcknowledgedResponse deserializedResponse = new AcknowledgedResponse(); + try (StreamInput in = out.bytes().streamInput()) { + deserializedResponse.readFrom(in); + } + assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); + } + } + { + final AcknowledgedResponse response = new AcknowledgedResponse(randomBoolean()); + try (BytesStreamOutput out = new BytesStreamOutput()) { + response.writeTo(out); + + final CloseIndexResponse deserializedResponse = new CloseIndexResponse(); + try (StreamInput in = out.bytes().streamInput()) { + in.setVersion(randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_8_0_0))); + deserializedResponse.readFrom(in); + } + assertThat(deserializedResponse.isAcknowledged(), equalTo(response.isAcknowledged())); + } + } + } + + private CloseIndexResponse randomResponse() { + final boolean acknowledged = randomBoolean(); + final boolean shardsAcknowledged = acknowledged ? randomBoolean() : false; + return new CloseIndexResponse(acknowledged, shardsAcknowledged); + } + + private static void assertCloseIndexResponse(final CloseIndexResponse actual, final CloseIndexResponse expected) { + assertThat(actual.isAcknowledged(), equalTo(expected.isAcknowledged())); + assertThat(actual.isShardsAcknowledged(), equalTo(expected.isShardsAcknowledged())); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index d9c7e26f01da4..78395cf544ad3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -915,7 +915,7 @@ public void testShardChangesWithDefaultDocType() throws Exception { * Test that the {@link org.elasticsearch.index.engine.NoOpEngine} takes precedence over other * engine factories if the index is closed. */ - public void testNoOpEngineFactoryTakesPrecedence() throws IOException { + public void testNoOpEngineFactoryTakesPrecedence() { final String indexName = "closed-index"; createIndex(indexName, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); ensureGreen(); @@ -927,7 +927,7 @@ public void testNoOpEngineFactoryTakesPrecedence() throws IOException { final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); final IndicesService indicesService = getInstanceFromNode(IndicesService.class); - final IndexService indexService = indicesService.createIndex(indexMetaData, Collections.emptyList()); + final IndexService indexService = indicesService.indexServiceSafe(indexMetaData.getIndex()); for (IndexShard indexShard : indexService) { assertThat(indexShard.getEngine(), instanceOf(NoOpEngine.class)); diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java index 81cea988cd020..ac83c50fea6ae 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -211,8 +212,13 @@ public void testIndexStateShardChanged() throws Throwable { assertThat(stateChangeListenerNode1.afterCloseSettings.getAsInt(SETTING_NUMBER_OF_SHARDS, -1), equalTo(6)); assertThat(stateChangeListenerNode1.afterCloseSettings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1), equalTo(1)); - assertShardStatesMatch(stateChangeListenerNode1, 6, CLOSED); - assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED); + if (Version.CURRENT.onOrAfter(Version.V_8_0_0)) { + assertShardStatesMatch(stateChangeListenerNode1, 6, CLOSED, CREATED, RECOVERING, POST_RECOVERY, STARTED); + assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED, CREATED, RECOVERING, POST_RECOVERY, STARTED); + } else { + assertShardStatesMatch(stateChangeListenerNode1, 6, CLOSED); + assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED); + } } private static void assertShardStatesMatch(final IndexShardStateChangeListener stateChangeListener, diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index 42f29e99982cc..62d72c3f71954 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -20,9 +20,11 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.routing.ShardRouting; @@ -112,7 +114,8 @@ public void testCloseAlreadyClosedIndex() throws Exception { assertIndexIsClosed(indexName); // Second close should be acked too - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); + final ActiveShardCount activeShardCount = randomFrom(ActiveShardCount.NONE, ActiveShardCount.DEFAULT, ActiveShardCount.ALL); + assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(activeShardCount))); assertIndexIsClosed(indexName); } @@ -126,7 +129,7 @@ public void testCloseUnassignedIndex() throws Exception { assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN)); assertThat(clusterState.routingTable().allShards().stream().allMatch(ShardRouting::unassigned), is(true)); - assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName))); + assertBusy(() -> assertAcked(client().admin().indices().prepareClose(indexName).setWaitForActiveShards(ActiveShardCount.NONE))); assertIndexIsClosed(indexName); } @@ -305,6 +308,25 @@ public void testConcurrentClosesAndOpens() throws Exception { indexer.totalIndexedDocs()); } + public void testCloseIndexWaitForActiveShards() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) // no replicas to avoid recoveries that could fail the index closing + .build()); + + final int nbDocs = randomIntBetween(0, 50); + indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) + .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); + ensureGreen(indexName); + + final CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose(indexName).get(); + assertThat(client().admin().cluster().prepareHealth(indexName).get().getStatus(), is(ClusterHealthStatus.GREEN)); + assertTrue(closeIndexResponse.isAcknowledged()); + assertTrue(closeIndexResponse.isShardsAcknowledged()); + assertIndexIsClosed(indexName); + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) { diff --git a/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java index 083c5ab1f5510..25d8f07bbd1cd 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java @@ -20,8 +20,8 @@ package org.elasticsearch.indices.state; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -72,7 +72,7 @@ public void testReopenDuringClose() throws Exception { final CountDownLatch block = new CountDownLatch(1); final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(indexName, block::countDown); - ActionFuture closeIndexResponse = client().admin().indices().prepareClose(indexName).execute(); + ActionFuture closeIndexResponse = client().admin().indices().prepareClose(indexName).execute(); assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS)); assertIndexIsBlocked(indexName); assertFalse(closeIndexResponse.isDone()); @@ -96,7 +96,7 @@ public void testReopenDuringCloseOnMultipleIndices() throws Exception { final CountDownLatch block = new CountDownLatch(1); final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(randomFrom(indices), block::countDown); - ActionFuture closeIndexResponse = client().admin().indices().prepareClose("index-*").execute(); + ActionFuture closeIndexResponse = client().admin().indices().prepareClose("index-*").execute(); assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS)); assertFalse(closeIndexResponse.isDone()); indices.forEach(ReopenWhileClosingIT::assertIndexIsBlocked); diff --git a/server/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java b/server/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java index 050d77a223101..854dba7fb894b 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java @@ -102,7 +102,7 @@ public void testFastCloseAfterCreateContinuesCreateAfterOpen() { assertThat(health.isTimedOut(), equalTo(false)); assertThat(health.getStatus(), equalTo(ClusterHealthStatus.RED)); - assertAcked(client().admin().indices().prepareClose("test")); + assertAcked(client().admin().indices().prepareClose("test").setWaitForActiveShards(ActiveShardCount.NONE)); logger.info("--> updating test index settings to allow allocation"); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 46b3c6e54f576..8df54c54e1478 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; @@ -193,7 +194,7 @@ private void closeIndexUpdateSettingsAndOpenIndex(String followIndex, Runnable handler, Consumer onFailure) { CloseIndexRequest closeRequest = new CloseIndexRequest(followIndex); - CheckedConsumer onResponse = response -> { + CheckedConsumer onResponse = response -> { updateSettingsAndOpenIndex(followIndex, updatedSettings, handler, onFailure); }; followerClient.admin().indices().close(closeRequest, ActionListener.wrap(onResponse, onFailure)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java index 2f111727b08ee..7f93934fd91f8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CloseFollowerIndexIT.java @@ -26,6 +26,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.singletonMap; @@ -96,7 +97,10 @@ public void testCloseAndReopenFollowerIndex() throws Exception { } atLeastDocsIndexed(followerClient(), "index2", 32); - AcknowledgedResponse response = followerClient().admin().indices().close(new CloseIndexRequest("index2")).get(); + + CloseIndexRequest closeIndexRequest = new CloseIndexRequest("index2"); + closeIndexRequest.waitForActiveShards(ActiveShardCount.NONE); + AcknowledgedResponse response = followerClient().admin().indices().close(closeIndexRequest).get(); assertThat(response.isAcknowledged(), is(true)); ClusterState clusterState = followerClient().admin().cluster().prepareState().get().getState(); @@ -126,6 +130,6 @@ public void testCloseAndReopenFollowerIndex() throws Exception { followerSearchRequest.source().trackTotalHits(true); long followerIndexDocs = followerClient().search(followerSearchRequest).actionGet().getHits().getTotalHits().value; assertThat(followerIndexDocs, equalTo(leaderIndexDocs)); - }); + }, 30L, TimeUnit.SECONDS); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java index 1efe5389d81b2..91b91ddd04f3c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/TransportFreezeIndexAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; @@ -126,9 +127,9 @@ protected void masterOperation(Task task, TransportFreezeIndexAction.FreezeReque .masterNodeTimeout(request.masterNodeTimeout()) .indices(concreteIndices); - indexStateService.closeIndices(closeRequest, new ActionListener() { + indexStateService.closeIndices(closeRequest, new ActionListener() { @Override - public void onResponse(final AcknowledgedResponse response) { + public void onResponse(final CloseIndexResponse response) { if (response.isAcknowledged()) { toggleFrozenSettings(concreteIndices, request, listener); } else { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java index 25e1c4e481bba..368afaa26d0cc 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStepTests.java @@ -8,7 +8,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; @@ -43,8 +43,8 @@ public void testCloseFollowingIndex() { CloseIndexRequest closeIndexRequest = (CloseIndexRequest) invocation.getArguments()[0]; assertThat(closeIndexRequest.indices()[0], equalTo("follower-index")); @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) invocation.getArguments()[1]; - listener.onResponse(new AcknowledgedResponse(true)); + ActionListener listener = (ActionListener) invocation.getArguments()[1]; + listener.onResponse(new CloseIndexResponse(true, true)); return null; }).when(indicesClient).close(Mockito.any(), Mockito.any());