diff --git a/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateTaskListener.java b/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateTaskListener.java index 148a1dea3095f..a4767507ef1aa 100644 --- a/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateTaskListener.java +++ b/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateTaskListener.java @@ -25,7 +25,11 @@ public interface AckedClusterStateTaskListener extends ClusterStateTaskListener { /** - * Called to determine which nodes the acknowledgement is expected from + * Called to determine which nodes the acknowledgement is expected from. + * + * As this method will be called multiple times to determine the set of acking nodes, + * it is crucial for it to return consistent results: Given the same listener instance + * and the same node parameter, the method implementation should return the same result. * * @param discoveryNode a node * @return true if the node is expected to send ack back, false otherwise diff --git a/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java b/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java index faf2f30bb3ed4..8d61fe964265d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java +++ b/server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java @@ -61,7 +61,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) { * @param e optional error that might have been thrown */ public void onAllNodesAcked(@Nullable Exception e) { - listener.onResponse(newResponse(true)); + listener.onResponse(newResponse(e == null)); } protected abstract Response newResponse(boolean acknowledged); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java index b8e898cf6f5e3..82d947b4158a2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataMappingService.java @@ -363,7 +363,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) { @Override public void onAllNodesAcked(@Nullable Exception e) { - listener.onResponse(new ClusterStateUpdateResponse(true)); + listener.onResponse(new ClusterStateUpdateResponse(e == null)); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 54a6568af3fa2..1757548c28b09 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -563,7 +563,7 @@ private static class AckCountDownListener implements Discovery.AckListener { private final AckedClusterStateTaskListener ackedTaskListener; private final CountDown countDown; - private final DiscoveryNodes nodes; + private final DiscoveryNode masterNode; private final long clusterStateVersion; private final Future ackTimeoutCallback; private Exception lastFailure; @@ -572,15 +572,14 @@ private static class AckCountDownListener implements Discovery.AckListener { ThreadPool threadPool) { this.ackedTaskListener = ackedTaskListener; this.clusterStateVersion = clusterStateVersion; - this.nodes = nodes; + this.masterNode = nodes.getMasterNode(); int countDown = 0; for (DiscoveryNode node : nodes) { - if (ackedTaskListener.mustAck(node)) { + //we always wait for at least the master node + if (node.equals(masterNode) || ackedTaskListener.mustAck(node)) { countDown++; } } - //we always wait for at least 1 node (the master) - countDown = Math.max(1, countDown); logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion); this.countDown = new CountDown(countDown); this.ackTimeoutCallback = threadPool.schedule(ackedTaskListener.ackTimeout(), ThreadPool.Names.GENERIC, () -> onTimeout()); @@ -588,11 +587,8 @@ private static class AckCountDownListener implements Discovery.AckListener { @Override public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { - if (!ackedTaskListener.mustAck(node)) { - //we always wait for the master ack anyway - if (!node.equals(nodes.getMasterNode())) { - return; - } + if (node.equals(masterNode) == false && ackedTaskListener.mustAck(node) == false) { + return; } if (e == null) { logger.trace("ack received from node [{}], cluster_state update (version: {})", node, clusterStateVersion); diff --git a/server/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java b/server/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java index ab3f82fff75f5..a11ceddf28788 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ack/AckClusterUpdateSettingsIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -33,8 +34,16 @@ import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.discovery.zen.PublishClusterStateAction; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.stream.Stream; import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -43,6 +52,11 @@ @ClusterScope(scope = TEST, minNumDataNodes = 2) public class AckClusterUpdateSettingsIT extends ESIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() @@ -156,4 +170,32 @@ public void testOpenIndexNoAcknowledgement() { assertThat(openIndexResponse.isAcknowledged(), equalTo(false)); ensureGreen("test"); // make sure that recovery from disk has completed, so that check index doesn't fail. } + + public void testAckingFailsIfNotPublishedToAllNodes() { + String masterNode = internalCluster().getMasterName(); + String nonMasterNode = Stream.of(internalCluster().getNodeNames()) + .filter(node -> node.equals(masterNode) == false).findFirst().get(); + + MockTransportService masterTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, masterNode); + MockTransportService nonMasterTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode); + + logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode); + if (randomBoolean() && internalCluster().numMasterNodes() != 2) { + masterTransportService.addFailToSendNoConnectRule(nonMasterTransportService, PublishClusterStateAction.SEND_ACTION_NAME); + } else { + masterTransportService.addFailToSendNoConnectRule(nonMasterTransportService, PublishClusterStateAction.COMMIT_ACTION_NAME); + } + + CreateIndexResponse response = client().admin().indices().prepareCreate("test").get(); + assertFalse(response.isAcknowledged()); + + logger.info("waiting for cluster to reform"); + masterTransportService.clearRule(nonMasterTransportService); + + ensureStableCluster(internalCluster().size()); + + assertAcked(client().admin().indices().prepareDelete("test")); + } }