Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zen2ify RareClusterStateIT #38184

Merged
merged 1 commit into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,21 @@ public Iterable<DiscoveryNode> getFoundPeers() {
return peerFinder.getFoundPeers();
}

/**
* If there is any current committed publication, this method cancels it.
* This method is used exclusively by tests.
* @return true if publication was cancelled, false if there is no current committed publication.
*/
boolean cancelCommittedPublication() {
synchronized (mutex) {
if (currentPublication.isPresent() && currentPublication.get().isCommitted()) {
currentPublication.get().cancel("cancelCommittedPublication");
return true;
}
return false;
}
}

class CoordinatorPublication extends Publication {

private final PublishRequest publishRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
* under the License.
*/

package org.elasticsearch.indices.state;
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -40,7 +42,7 @@
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentMapper;
Expand All @@ -51,10 +53,9 @@
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.junit.annotations.TestLogging;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -86,7 +87,7 @@ protected int numberOfReplicas() {
return 0;
}

public void testAssignmentWithJustAddedNodes() throws Exception {
public void testAssignmentWithJustAddedNodes() {
internalCluster().startNode();
final String index = "index";
prepareCreate(index).setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
Expand Down Expand Up @@ -149,22 +150,20 @@ public void onFailure(String source, Exception e) {
});
}

private <Req extends ActionRequest, Res extends ActionResponse> ActionFuture<Res> executeAndCancelCommittedPublication(
ActionRequestBuilder<Req, Res> req) throws Exception {
ActionFuture<Res> future = req.execute();
assertBusy(() -> assertTrue(((Coordinator)internalCluster().getMasterNodeInstance(Discovery.class)).cancelCommittedPublication()));
return future;
}

public void testDeleteCreateInOneBulk() throws Exception {
internalCluster().startMasterOnlyNode(Settings.builder()
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // TODO: convert test to support Zen2
.build());
String dataNode = internalCluster().startDataOnlyNode(Settings.builder()
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // TODO: convert test to support Zen2
.build());
internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());
prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).addMapping("type").get();
ensureGreen("test");

// now that the cluster is stable, remove publishing timeout
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0")
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s")));

// block none master node.
BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(dataNode, random());
internalCluster().setDisruptionScheme(disruption);
Expand All @@ -173,10 +172,14 @@ public void testDeleteCreateInOneBulk() throws Exception {
refresh();
disruption.startDisrupting();
logger.info("--> delete index and recreate it");
assertFalse(client().admin().indices().prepareDelete("test").setTimeout("200ms").get().isAcknowledged());
assertFalse(prepareCreate("test").setTimeout("200ms").setSettings(Settings.builder().put(IndexMetaData
.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), "0")).get().isAcknowledged());
executeAndCancelCommittedPublication(client().admin().indices().prepareDelete("test").setTimeout("0s"))
.get(10, TimeUnit.SECONDS);
executeAndCancelCommittedPublication(prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData
.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), "0")).setTimeout("0s"))
.get(10, TimeUnit.SECONDS);

logger.info("--> letting cluster proceed");

disruption.stopDisrupting();
ensureGreen(TimeValue.timeValueMinutes(30), "test");
// due to publish_timeout of 0, wait for data node to have cluster state fully applied
Expand All @@ -196,12 +199,7 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {
// but the change might not be on the node that performed the indexing
// operation yet

Settings settings = Settings.builder()
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // TODO: convert test to support Zen2
.build();
final List<String> nodeNames = internalCluster().startNodes(2, settings);
final List<String> nodeNames = internalCluster().startNodes(2);
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());

final String master = internalCluster().getMasterName();
Expand Down Expand Up @@ -242,19 +240,10 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {
disruption.startDisrupting();

// Add a new mapping...
final AtomicReference<Object> putMappingResponse = new AtomicReference<>();
client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute(
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
putMappingResponse.set(response);
}
ActionFuture<AcknowledgedResponse> putMappingResponse =
executeAndCancelCommittedPublication(client().admin().indices().preparePutMapping("index")
.setType("type").setSource("field", "type=long"));

@Override
public void onFailure(Exception e) {
putMappingResponse.set(e);
}
});
// ...and wait for mappings to be available on master
assertBusy(() -> {
ImmutableOpenMap<String, MappingMetaData> indexMappings = client().admin().indices()
Expand All @@ -273,36 +262,24 @@ public void onFailure(Exception e) {
assertNotNull(fieldMapping);
});

final AtomicReference<Object> docIndexResponse = new AtomicReference<>();
client().prepareIndex("index", "type", "1").setSource("field", 42).execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
docIndexResponse.set(response);
}

@Override
public void onFailure(Exception e) {
docIndexResponse.set(e);
}
});
// this request does not change the cluster state, because mapping is already created,
// we don't await and cancel committed publication
ActionFuture<IndexResponse> docIndexResponse =
client().prepareIndex("index", "type", "1").setSource("field", 42).execute();

// Wait a bit to make sure that the reason why we did not get a response
// is that cluster state processing is blocked and not just that it takes
// time to process the indexing request
Thread.sleep(100);
assertThat(putMappingResponse.get(), equalTo(null));
assertThat(docIndexResponse.get(), equalTo(null));
assertFalse(putMappingResponse.isDone());
assertFalse(docIndexResponse.isDone());

// Now make sure the indexing request finishes successfully
disruption.stopDisrupting();
assertBusy(() -> {
assertThat(putMappingResponse.get(), instanceOf(AcknowledgedResponse.class));
AcknowledgedResponse resp = (AcknowledgedResponse) putMappingResponse.get();
assertTrue(resp.isAcknowledged());
assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class));
IndexResponse docResp = (IndexResponse) docIndexResponse.get();
assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
1, docResp.getShardInfo().getTotal());
assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged());
assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class));
assertEquals(1, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal());
});
}

Expand All @@ -312,12 +289,7 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
// Here we want to test that everything goes well if the mappings that
// are needed for a document are not available on the replica at the
// time of indexing it
final List<String> nodeNames = internalCluster().startNodes(2,
Settings.builder()
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // explicitly set so it won't default to publish timeout
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait post commit as we are blocking things by design
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // TODO: convert test to support Zen2
.build());
final List<String> nodeNames = internalCluster().startNodes(2);
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("2").get().isTimedOut());

final String master = internalCluster().getMasterName();
Expand Down Expand Up @@ -359,19 +331,10 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
BlockClusterStateProcessing disruption = new BlockClusterStateProcessing(otherNode, random());
internalCluster().setDisruptionScheme(disruption);
disruption.startDisrupting();
final AtomicReference<Object> putMappingResponse = new AtomicReference<>();
client().admin().indices().preparePutMapping("index").setType("type").setSource("field", "type=long").execute(
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
putMappingResponse.set(response);
}
final ActionFuture<AcknowledgedResponse> putMappingResponse =
executeAndCancelCommittedPublication(client().admin().indices().preparePutMapping("index")
.setType("type").setSource("field", "type=long"));

@Override
public void onFailure(Exception e) {
putMappingResponse.set(e);
}
});
final Index index = resolveIndex("index");
// Wait for mappings to be available on master
assertBusy(() -> {
Expand All @@ -384,25 +347,17 @@ public void onFailure(Exception e) {
assertNotNull(mapper.mappers().getMapper("field"));
});

final AtomicReference<Object> docIndexResponse = new AtomicReference<>();
client().prepareIndex("index", "type", "1").setSource("field", 42).execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
docIndexResponse.set(response);
}

@Override
public void onFailure(Exception e) {
docIndexResponse.set(e);
}
});
final ActionFuture<IndexResponse> docIndexResponse = client().prepareIndex("index", "type", "1").setSource("field", 42).execute();

assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").get().isExists()));

// index another document, this time using dynamic mappings.
// The ack timeout of 0 on dynamic mapping updates makes it possible for the document to be indexed on the primary, even
// if the dynamic mapping update is not applied on the replica yet.
ActionFuture<IndexResponse> dynamicMappingsFut = client().prepareIndex("index", "type", "2").setSource("field2", 42).execute();
// this request does not change the cluster state, because the mapping is dynamic,
// we need to await and cancel committed publication
ActionFuture<IndexResponse> dynamicMappingsFut =
executeAndCancelCommittedPublication(client().prepareIndex("index", "type", "2").setSource("field2", 42));

// ...and wait for second mapping to be available on master
assertBusy(() -> {
Expand All @@ -421,22 +376,18 @@ public void onFailure(Exception e) {
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
// and not just because it takes time to replicate the indexing request to the replica
Thread.sleep(100);
assertThat(putMappingResponse.get(), equalTo(null));
assertThat(docIndexResponse.get(), equalTo(null));
assertFalse(putMappingResponse.isDone());
assertFalse(docIndexResponse.isDone());

// Now make sure the indexing request finishes successfully
disruption.stopDisrupting();
assertBusy(() -> {
assertThat(putMappingResponse.get(), instanceOf(AcknowledgedResponse.class));
AcknowledgedResponse resp = (AcknowledgedResponse) putMappingResponse.get();
assertTrue(resp.isAcknowledged());
assertThat(docIndexResponse.get(), instanceOf(IndexResponse.class));
IndexResponse docResp = (IndexResponse) docIndexResponse.get();
assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
2, docResp.getShardInfo().getTotal()); // both shards should have succeeded
assertTrue(putMappingResponse.get(10, TimeUnit.SECONDS).isAcknowledged());
assertThat(docIndexResponse.get(10, TimeUnit.SECONDS), instanceOf(IndexResponse.class));
assertEquals(2, docIndexResponse.get(10, TimeUnit.SECONDS).getShardInfo().getTotal()); // both shards should have succeeded
});

assertThat(dynamicMappingsFut.get().getResult(), equalTo(CREATED));
assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED));
}

}