Skip to content

Commit

Permalink
Local node is discovered when cluster fails (elastic#43316)
Browse files Browse the repository at this point in the history
Today the `ClusterFormationFailureHelper` does not include the local node in
the list of nodes it claims to have discovered. This means that it sometimes
reports that it has not discovered a quorum when in fact it has. This commit
adds the local node to the set of discovered nodes.
  • Loading branch information
DaveCTurner committed Jun 18, 2019
1 parent 2e064e0 commit 90a8589
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_ID;
Expand Down Expand Up @@ -197,7 +198,8 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe

private ClusterFormationState getClusterFormationState() {
return new ClusterFormationState(settings, getStateForMasterService(), peerFinder.getLastResolvedAddresses(),
StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false).collect(Collectors.toList()), getCurrentTerm());
Stream.concat(Stream.of(getLocalNode()), StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false))
.collect(Collectors.toList()), getCurrentTerm());
}

private void onLeaderFailure(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -1258,6 +1260,79 @@ public void testClusterRecoversAfterExceptionDuringSerialization() {
}
}

public void testLogsWarningPeriodicallyIfClusterNotFormed() {
final long warningDelayMillis;
final Settings settings;
if (randomBoolean()) {
settings = Settings.EMPTY;
warningDelayMillis = ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings).millis();
} else {
warningDelayMillis = randomLongBetween(1, 100000);
settings = Settings.builder()
.put(ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.getKey(), warningDelayMillis + "ms")
.build();
}
logger.info("--> emitting warnings every [{}ms]", warningDelayMillis);

final Cluster cluster = new Cluster(3, true, settings);
cluster.runRandomly();
cluster.stabilise();

logger.info("--> disconnecting all nodes");

for (final ClusterNode clusterNode : cluster.clusterNodes) {
clusterNode.disconnect();
}

cluster.runFor(defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING),
"waiting for leader failure");

for (int i = scaledRandomIntBetween(1, 10); i >= 0; i--) {
final MockLogAppender mockLogAppender;
try {
mockLogAppender = new MockLogAppender();
} catch (IllegalAccessException e) {
throw new AssertionError(e);
}

try {
Loggers.addAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender);
mockLogAppender.start();
mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() {
final Set<DiscoveryNode> nodesLogged = new HashSet<>();

@Override
public void match(LogEvent event) {
final String message = event.getMessage().getFormattedMessage();
assertThat(message,
startsWith("master not discovered or elected yet, an election requires at least 2 nodes with ids from ["));

final List<ClusterNode> matchingNodes = cluster.clusterNodes.stream()
.filter(n -> event.getContextData().<String>getValue(NODE_ID_LOG_CONTEXT_KEY)
.equals(getNodeIdForLogContext(n.getLocalNode()))).collect(Collectors.toList());
assertThat(matchingNodes, hasSize(1));

assertTrue(Regex.simpleMatch("*have discovered *" + matchingNodes.get(0).toString() + "*discovery will continue*",
message));

nodesLogged.add(matchingNodes.get(0).getLocalNode());
}

@Override
public void assertMatched() {
assertThat(nodesLogged + " vs " + cluster.clusterNodes, nodesLogged,
equalTo(cluster.clusterNodes.stream().map(ClusterNode::getLocalNode).collect(Collectors.toSet())));
}
});
cluster.runFor(warningDelayMillis, "waiting for warning to be emitted");
mockLogAppender.assertAllExpectationsMatched();
} finally {
mockLogAppender.stop();
Loggers.removeAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender);
}
}
}

private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}
Expand Down Expand Up @@ -2183,12 +2258,18 @@ private List<TransportAddress> provideSeedHosts(HostsResolver ignored) {
}
}

private static final String NODE_ID_LOG_CONTEXT_KEY = "nodeId";

private static String getNodeIdForLogContext(DiscoveryNode node) {
return "{" + node.getId() + "}{" + node.getEphemeralId() + "}";
}

public static Runnable onNodeLog(DiscoveryNode node, Runnable runnable) {
final String nodeId = "{" + node.getId() + "}{" + node.getEphemeralId() + "}";
final String nodeId = getNodeIdForLogContext(node);
return new Runnable() {
@Override
public void run() {
try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put("nodeId", nodeId)) {
try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put(NODE_ID_LOG_CONTEXT_KEY, nodeId)) {
runnable.run();
}
}
Expand Down

0 comments on commit 90a8589

Please sign in to comment.