Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only ack cluster state updates successfully applied on all nodes #30672

Merged
merged 3 commits into from
May 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
public interface AckedClusterStateTaskListener extends ClusterStateTaskListener {

/**
* Called to determine which nodes the acknowledgement is expected from
* Called to determine which nodes the acknowledgement is expected from.
*
* As this method will be called multiple times to determine the set of acking nodes,
* it is crucial for it to return consistent results: Given the same listener instance
* and the same node parameter, the method implementation should return the same result.
*
* @param discoveryNode a node
* @return true if the node is expected to send ack back, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) {
* @param e optional error that might have been thrown
*/
public void onAllNodesAcked(@Nullable Exception e) {
listener.onResponse(newResponse(true));
listener.onResponse(newResponse(e == null));
}

protected abstract Response newResponse(boolean acknowledged);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public boolean mustAck(DiscoveryNode discoveryNode) {

@Override
public void onAllNodesAcked(@Nullable Exception e) {
listener.onResponse(new ClusterStateUpdateResponse(true));
listener.onResponse(new ClusterStateUpdateResponse(e == null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ private static class AckCountDownListener implements Discovery.AckListener {

private final AckedClusterStateTaskListener ackedTaskListener;
private final CountDown countDown;
private final DiscoveryNodes nodes;
private final DiscoveryNode masterNode;
private final long clusterStateVersion;
private final Future<?> ackTimeoutCallback;
private Exception lastFailure;
Expand All @@ -572,27 +572,23 @@ private static class AckCountDownListener implements Discovery.AckListener {
ThreadPool threadPool) {
this.ackedTaskListener = ackedTaskListener;
this.clusterStateVersion = clusterStateVersion;
this.nodes = nodes;
this.masterNode = nodes.getMasterNode();
int countDown = 0;
for (DiscoveryNode node : nodes) {
if (ackedTaskListener.mustAck(node)) {
//we always wait for at least the master node
if (node.equals(masterNode) || ackedTaskListener.mustAck(node)) {
countDown++;
}
}
//we always wait for at least 1 node (the master)
countDown = Math.max(1, countDown);
logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, clusterStateVersion);
this.countDown = new CountDown(countDown);
this.ackTimeoutCallback = threadPool.schedule(ackedTaskListener.ackTimeout(), ThreadPool.Names.GENERIC, () -> onTimeout());
}

@Override
public void onNodeAck(DiscoveryNode node, @Nullable Exception e) {
if (!ackedTaskListener.mustAck(node)) {
//we always wait for the master ack anyway
if (!node.equals(nodes.getMasterNode())) {
return;
}
if (node.equals(masterNode) == false && ackedTaskListener.mustAck(node) == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It worries me a little that ackedTaskListener.mustAck(node) may give a different response here from the call in the constructor. Can we keep a set of the expected-to-ack nodes instead? Maybe only for the purposes of an assertion, or maybe check membership of the set instead of this expression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems a little overkill to me, especially as we only have very few implementations of this, which are all pretty straightforward. Keeping an additional set of nodes and checking it also seems computationally and memory-wise more expensive as we can potentially have a large number of listeners. I'm wondering if this just adds cruft without any real benefit. How about adding an additional sentence to the Javadocs of mustAck that states that implementations of this method must, given the same listener instance and the same parameter return the same result?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'm ok with it going in the docs.

return;
}
if (e == null) {
logger.trace("ack received from node [{}], cluster_state update (version: {})", node, clusterStateVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -33,8 +34,16 @@
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.PublishClusterStateAction;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Stream;

import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand All @@ -43,6 +52,11 @@
@ClusterScope(scope = TEST, minNumDataNodes = 2)
public class AckClusterUpdateSettingsIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
Expand Down Expand Up @@ -156,4 +170,32 @@ public void testOpenIndexNoAcknowledgement() {
assertThat(openIndexResponse.isAcknowledged(), equalTo(false));
ensureGreen("test"); // make sure that recovery from disk has completed, so that check index doesn't fail.
}

public void testAckingFailsIfNotPublishedToAllNodes() {
String masterNode = internalCluster().getMasterName();
String nonMasterNode = Stream.of(internalCluster().getNodeNames())
.filter(node -> node.equals(masterNode) == false).findFirst().get();

MockTransportService masterTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, masterNode);
MockTransportService nonMasterTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, nonMasterNode);

logger.info("blocking cluster state publishing from master [{}] to non master [{}]", masterNode, nonMasterNode);
if (randomBoolean() && internalCluster().numMasterNodes() != 2) {
masterTransportService.addFailToSendNoConnectRule(nonMasterTransportService, PublishClusterStateAction.SEND_ACTION_NAME);
} else {
masterTransportService.addFailToSendNoConnectRule(nonMasterTransportService, PublishClusterStateAction.COMMIT_ACTION_NAME);
}

CreateIndexResponse response = client().admin().indices().prepareCreate("test").get();
assertFalse(response.isAcknowledged());

logger.info("waiting for cluster to reform");
masterTransportService.clearRule(nonMasterTransportService);

ensureStableCluster(internalCluster().size());

assertAcked(client().admin().indices().prepareDelete("test"));
}
}