diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index d73d33a0635c0..231f5555e8ff1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -1131,6 +1131,21 @@ public Iterable getFoundPeers() { return peerFinder.getFoundPeers(); } + /** + * If there is any current committed publication, this method cancels it. + * This method is used exclusively by tests. + * @return true if publication was cancelled, false if there is no current committed publication. + */ + boolean cancelCommittedPublication() { + synchronized (mutex) { + if (currentPublication.isPresent() && currentPublication.get().isCommitted()) { + currentPublication.get().cancel("cancelCommittedPublication"); + return true; + } + return false; + } + } + class CoordinatorPublication extends Publication { private final PublishRequest publishRequest; diff --git a/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java similarity index 73% rename from server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java rename to server/src/test/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java index d2f65d1168da8..49b4086372d21 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/RareClusterStateIT.java @@ -17,12 +17,14 @@ * under the License. */ -package org.elasticsearch.indices.state; +package org.elasticsearch.cluster.coordination; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; @@ -40,7 +42,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.mapper.DocumentMapper; @@ -51,10 +53,9 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.junit.annotations.TestLogging; -import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -86,7 +87,7 @@ protected int numberOfReplicas() { return 0; } - public void testAssignmentWithJustAddedNodes() throws Exception { + public void testAssignmentWithJustAddedNodes() { internalCluster().startNode(); final String index = "index"; prepareCreate(index).setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) @@ -149,22 +150,20 @@ public void onFailure(String source, Exception e) { }); } + private ActionFuture executeAndCancelCommittedPublication( + ActionRequestBuilder req) throws Exception { + ActionFuture future = req.execute(); + assertBusy(() -> assertTrue(((Coordinator)internalCluster().getMasterNodeInstance(Discovery.class)).cancelCommittedPublication())); + return future; + } + public void testDeleteCreateInOneBulk() throws Exception { - internalCluster().startMasterOnlyNode(Settings.builder() - .put(TestZenDiscovery.USE_ZEN2.getKey(), false) // TODO: convert test to support Zen2 - .build()); - String dataNode = internalCluster().startDataOnlyNode(Settings.builder() - .put(TestZenDiscovery.USE_ZEN2.getKey(), false) // TODO: convert test to support Zen2 - .build()); + internalCluster().startMasterOnlyNode(); + String dataNode = internalCluster().startDataOnlyNode(); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).addMapping("type").get(); ensureGreen("test"); - // now that the cluster is stable, remove publishing timeout - assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0") - .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s"))); - // block none master node. BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(dataNode, random()); internalCluster().setDisruptionScheme(disruption); @@ -173,10 +172,14 @@ public void testDeleteCreateInOneBulk() throws Exception { refresh(); disruption.startDisrupting(); logger.info("--> delete index and recreate it"); - assertFalse(client().admin().indices().prepareDelete("test").setTimeout("200ms").get().isAcknowledged()); - assertFalse(prepareCreate("test").setTimeout("200ms").setSettings(Settings.builder().put(IndexMetaData - .SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), "0")).get().isAcknowledged()); + executeAndCancelCommittedPublication(client().admin().indices().prepareDelete("test").setTimeout("0s")) + .get(10, TimeUnit.SECONDS); + executeAndCancelCommittedPublication(prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData + .SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), "0")).setTimeout("0s")) + .get(10, TimeUnit.SECONDS); + logger.info("--> letting cluster proceed"); + disruption.stopDisrupting(); ensureGreen(TimeValue.timeValueMinutes(30), "test"); // due to publish_timeout of 0, wait for data node to have cluster state fully applied @@ -196,12 +199,7 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception { // but the change might not be on the node that performed the indexing // operation yet - Settings settings = Settings.builder() - .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design - .put(TestZenDiscovery.USE_ZEN2.getKey(), false) // TODO: convert test to support Zen2 - .build(); - final List nodeNames = internalCluster().startNodes(2, settings); + final List nodeNames = internalCluster().startNodes(2); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); final String master = internalCluster().getMasterName(); @@ -242,19 +240,10 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception { disruption.startDisrupting(); // Add a new mapping... - final AtomicReference putMappingResponse = new AtomicReference<>(); - client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute( - new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse response) { - putMappingResponse.set(response); - } + ActionFuture putMappingResponse = + executeAndCancelCommittedPublication(client().admin().indices().preparePutMapping("index") + .setType("type").setSource("field", "type=long")); - @Override - public void onFailure(Exception e) { - putMappingResponse.set(e); - } - }); // ...and wait for mappings to be available on master assertBusy(() -> { ImmutableOpenMap indexMappings = client().admin().indices() @@ -273,36 +262,24 @@ public void onFailure(Exception e) { assertNotNull(fieldMapping); }); - final AtomicReference docIndexResponse = new AtomicReference<>(); - client().prepareIndex("index", "type", "1").setSource("field", 42).execute(new ActionListener() { - @Override - public void onResponse(IndexResponse response) { - docIndexResponse.set(response); - } - - @Override - public void onFailure(Exception e) { - docIndexResponse.set(e); - } - }); + // this request does not change the cluster state, because mapping is already created, + // we don't await and cancel committed publication + ActionFuture docIndexResponse = + client().prepareIndex("index", "type", "1").setSource("field", 42).execute(); // Wait a bit to make sure that the reason why we did not get a response // is that cluster state processing is blocked and not just that it takes // time to process the indexing request Thread.sleep(100); - assertThat(putMappingResponse.get(), equalTo(null)); - assertThat(docIndexResponse.get(), equalTo(null)); + assertFalse(putMappingResponse.isDone()); + assertFalse(docIndexResponse.isDone()); // Now make sure the indexing request finishes successfully disruption.stopDisrupting(); assertBusy(() -> { - assertThat(putMappingResponse.get(), instanceOf(AcknowledgedResponse.class)); - AcknowledgedResponse resp = (AcknowledgedResponse) putMappingResponse.get(); - assertTrue(resp.isAcknowledged()); - assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class)); - IndexResponse docResp = (IndexResponse) docIndexResponse.get(); - assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), - 1, docResp.getShardInfo().getTotal()); + assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged()); + assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class)); + assertEquals(1, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); }); } @@ -312,12 +289,7 @@ public void testDelayedMappingPropagationOnReplica() throws Exception { // Here we want to test that everything goes well if the mappings that // are needed for a document are not available on the replica at the // time of indexing it - final List nodeNames = internalCluster().startNodes(2, - Settings.builder() - .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design - .put(TestZenDiscovery.USE_ZEN2.getKey(), false) // TODO: convert test to support Zen2 - .build()); + final List nodeNames = internalCluster().startNodes(2); assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut()); final String master = internalCluster().getMasterName(); @@ -359,19 +331,10 @@ public void testDelayedMappingPropagationOnReplica() throws Exception { BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(otherNode, random()); internalCluster().setDisruptionScheme(disruption); disruption.startDisrupting(); - final AtomicReference putMappingResponse = new AtomicReference<>(); - client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute( - new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse response) { - putMappingResponse.set(response); - } + final ActionFuture putMappingResponse = + executeAndCancelCommittedPublication(client().admin().indices().preparePutMapping("index") + .setType("type").setSource("field", "type=long")); - @Override - public void onFailure(Exception e) { - putMappingResponse.set(e); - } - }); final Index index = resolveIndex("index"); // Wait for mappings to be available on master assertBusy(() -> { @@ -384,25 +347,17 @@ public void onFailure(Exception e) { assertNotNull(mapper.mappers().getMapper("field")); }); - final AtomicReference docIndexResponse = new AtomicReference<>(); - client().prepareIndex("index", "type", "1").setSource("field", 42).execute(new ActionListener() { - @Override - public void onResponse(IndexResponse response) { - docIndexResponse.set(response); - } - - @Override - public void onFailure(Exception e) { - docIndexResponse.set(e); - } - }); + final ActionFuture docIndexResponse = client().prepareIndex("index", "type", "1").setSource("field", 42).execute(); assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").get().isExists())); // index another document, this time using dynamic mappings. // The ack timeout of 0 on dynamic mapping updates makes it possible for the document to be indexed on the primary, even // if the dynamic mapping update is not applied on the replica yet. - ActionFuture dynamicMappingsFut = client().prepareIndex("index", "type", "2").setSource("field2", 42).execute(); + // this request does not change the cluster state, because the mapping is dynamic, + // we need to await and cancel committed publication + ActionFuture dynamicMappingsFut = + executeAndCancelCommittedPublication(client().prepareIndex("index", "type", "2").setSource("field2", 42)); // ...and wait for second mapping to be available on master assertBusy(() -> { @@ -421,22 +376,18 @@ public void onFailure(Exception e) { // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled // and not just because it takes time to replicate the indexing request to the replica Thread.sleep(100); - assertThat(putMappingResponse.get(), equalTo(null)); - assertThat(docIndexResponse.get(), equalTo(null)); + assertFalse(putMappingResponse.isDone()); + assertFalse(docIndexResponse.isDone()); // Now make sure the indexing request finishes successfully disruption.stopDisrupting(); assertBusy(() -> { - assertThat(putMappingResponse.get(), instanceOf(AcknowledgedResponse.class)); - AcknowledgedResponse resp = (AcknowledgedResponse) putMappingResponse.get(); - assertTrue(resp.isAcknowledged()); - assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class)); - IndexResponse docResp = (IndexResponse) docIndexResponse.get(); - assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), - 2, docResp.getShardInfo().getTotal()); // both shards should have succeeded + assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged()); + assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class)); + assertEquals(2, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); // both shards should have succeeded }); - assertThat(dynamicMappingsFut.get().getResult(), equalTo(CREATED)); + assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED)); } }