diff --git a/CHANGELOG.md b/CHANGELOG.md index e35d8c3eda0b3..04b62c11a6865 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Add APIs (GET/PUT) to decommission awareness attribute ([#4261](https://github.com/opensearch-project/OpenSearch/pull/4261)) - Improve Gradle pre-commit checks to pre-empt Jenkins build ([#4660](https://github.com/opensearch-project/OpenSearch/pull/4660)) - Update to Apache Lucene 9.4.0 ([#4661](https://github.com/opensearch-project/OpenSearch/pull/4661)) +- Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590)) - Backport Apache Lucene version change for 2.4.0 ([#4677](https://github.com/opensearch-project/OpenSearch/pull/4677)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 7ac716084793d..fbb345ea3a441 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -105,6 +105,7 @@ import java.util.stream.StreamSupport; import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID; +import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned; import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; @@ -138,6 +139,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final Settings settings; private final boolean singleNodeDiscovery; + private volatile boolean localNodeCommissioned; private final ElectionStrategy electionStrategy; private final TransportService transportService; private final ClusterManagerService clusterManagerService; @@ -218,7 +220,8 @@ public Coordinator( this::joinLeaderInTerm, this.onJoinValidators, rerouteService, - nodeHealthService + nodeHealthService, + this::onNodeCommissionStatusChange ); this.persistedStateSupplier = persistedStateSupplier; this.noClusterManagerBlockService = new NoClusterManagerBlockService(settings, clusterSettings); @@ -281,6 +284,7 @@ public Coordinator( joinHelper::logLastFailedJoinAttempt ); this.nodeHealthService = nodeHealthService; + this.localNodeCommissioned = true; } private ClusterFormationState getClusterFormationState() { @@ -596,6 +600,9 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinRequest.getSourceNode().getVersion(), stateForJoinValidation.getNodes().getMinNodeVersion() ); + // we are checking source node commission status here to reject any join request coming from a decommissioned node + // even before executing the join task to fail fast + JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata()); } sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback); } else { @@ -1424,6 +1431,17 @@ protected void onFoundPeersUpdated() { } } + // package-visible for testing + synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) { + this.localNodeCommissioned = localNodeCommissioned; + peerFinder.onNodeCommissionStatusChange(localNodeCommissioned); + } + + // package-visible for testing + boolean localNodeCommissioned() { + return localNodeCommissioned; + } + private void startElectionScheduler() { assert electionScheduler == null : electionScheduler; @@ -1450,6 +1468,14 @@ public void run() { return; } + // if either the localNodeCommissioned flag or the last accepted state thinks it should skip pre voting, we will + // acknowledge it + if (nodeCommissioned(lastAcceptedState.nodes().getLocalNode(), lastAcceptedState.metadata()) == false + || localNodeCommissioned == false) { + logger.debug("skip prevoting as local node is decommissioned"); + return; + } + if (prevotingRound != null) { prevotingRound.close(); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java index 656e6d220720f..a66152b8016ee 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinHelper.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.ClusterStateTaskListener; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.coordination.Coordinator.Mode; +import org.opensearch.cluster.decommission.NodeDecommissionedException; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RerouteService; @@ -57,6 +58,7 @@ import org.opensearch.monitor.StatusInfo; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; +import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportRequest; @@ -78,6 +80,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -118,6 +121,7 @@ public class JoinHelper { private final AtomicReference lastFailedJoinAttempt = new AtomicReference<>(); private final Supplier joinTaskExecutorGenerator; + private final Consumer nodeCommissioned; JoinHelper( Settings settings, @@ -130,12 +134,14 @@ public class JoinHelper { Function joinLeaderInTerm, Collection> joinValidators, RerouteService rerouteService, - NodeHealthService nodeHealthService + NodeHealthService nodeHealthService, + Consumer nodeCommissioned ) { this.clusterManagerService = clusterManagerService; this.transportService = transportService; this.nodeHealthService = nodeHealthService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); + this.nodeCommissioned = nodeCommissioned; this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService, transportService) { private final long term = currentTermSupplier.getAsLong(); @@ -342,6 +348,7 @@ public void handleResponse(Empty response) { pendingOutgoingJoins.remove(dedupKey); logger.debug("successfully joined {} with {}", destination, joinRequest); lastFailedJoinAttempt.set(null); + nodeCommissioned.accept(true); onCompletion.run(); } @@ -352,6 +359,13 @@ public void handleException(TransportException exp) { FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp); attempt.logNow(); lastFailedJoinAttempt.set(attempt); + if (exp instanceof RemoteTransportException && (exp.getCause() instanceof NodeDecommissionedException)) { + logger.info( + "local node is decommissioned [{}]. Will not be able to join the cluster", + exp.getCause().getMessage() + ); + nodeCommissioned.accept(false); + } onCompletion.run(); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 814aa17255931..ac237db85ee5b 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -39,9 +39,6 @@ import org.opensearch.cluster.ClusterStateTaskExecutor; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.block.ClusterBlocks; -import org.opensearch.cluster.decommission.DecommissionAttribute; -import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; -import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.cluster.decommission.NodeDecommissionedException; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; @@ -64,6 +61,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; +import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; /** @@ -196,6 +194,9 @@ public ClusterTasksResult execute(ClusterState currentState, List jo // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices // we have to reject nodes that don't support all indices we have in this cluster ensureIndexCompatibility(node.getVersion(), currentState.getMetadata()); + // we have added the same check in handleJoinRequest method and adding it here as this method + // would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness + ensureNodeCommissioned(node, currentState.metadata()); nodesBuilder.add(node); nodesChanged = true; minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); @@ -203,7 +204,7 @@ public ClusterTasksResult execute(ClusterState currentState, List jo if (node.isClusterManagerNode()) { joiniedNodeNameIds.put(node.getName(), node.getId()); } - } catch (IllegalArgumentException | IllegalStateException e) { + } catch (IllegalArgumentException | IllegalStateException | NodeDecommissionedException e) { results.failure(joinTask, e); continue; } @@ -477,22 +478,13 @@ public static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version } public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata) { - DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata(); - if (decommissionAttributeMetadata != null) { - DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); - DecommissionStatus status = decommissionAttributeMetadata.status(); - if (decommissionAttribute != null && status != null) { - // We will let the node join the cluster if the current status is in FAILED state - if (node.getAttributes().get(decommissionAttribute.attributeName()).equals(decommissionAttribute.attributeValue()) - && (status.equals(DecommissionStatus.IN_PROGRESS) || status.equals(DecommissionStatus.SUCCESSFUL))) { - throw new NodeDecommissionedException( - "node [{}] has decommissioned attribute [{}] with current status of decommissioning [{}]", - node.toString(), - decommissionAttribute.toString(), - status.status() - ); - } - } + if (nodeCommissioned(node, metadata) == false) { + throw new NodeDecommissionedException( + "node [{}] has decommissioned attribute [{}] with current status of decommissioning [{}]", + node.toString(), + metadata.decommissionAttributeMetadata().decommissionAttribute().toString(), + metadata.decommissionAttributeMetadata().status().status() + ); } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index fcab411f073ba..5def2733b5ded 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -389,10 +389,6 @@ private Set filterNodesWithDecommissionAttribute( return nodesWithDecommissionAttribute; } - private static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) { - return discoveryNode.getAttributes().get(decommissionAttribute.attributeName()).equals(decommissionAttribute.attributeValue()); - } - private static void validateAwarenessAttribute( final DecommissionAttribute decommissionAttribute, List awarenessAttributes, @@ -531,4 +527,38 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } }); } + + /** + * Utility method to check if the node has decommissioned attribute + * + * @param discoveryNode node to check on + * @param decommissionAttribute attribute to be checked with + * @return true or false based on whether node has decommissioned attribute + */ + public static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) { + String nodeAttributeValue = discoveryNode.getAttributes().get(decommissionAttribute.attributeName()); + return nodeAttributeValue != null && nodeAttributeValue.equals(decommissionAttribute.attributeValue()); + } + + /** + * Utility method to check if the node is commissioned or not + * + * @param discoveryNode node to check on + * @param metadata metadata present current which will be used to check the commissioning status of the node + * @return if the node is commissioned or not + */ + public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata metadata) { + DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata(); + if (decommissionAttributeMetadata != null) { + DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + DecommissionStatus status = decommissionAttributeMetadata.status(); + if (decommissionAttribute != null && status != null) { + if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute) + && (status.equals(DecommissionStatus.IN_PROGRESS) || status.equals(DecommissionStatus.SUCCESSFUL))) { + return false; + } + } + } + return true; + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 1665614c18496..54579031aac08 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -534,6 +534,7 @@ public void apply(Settings value, Settings current, Settings previous) { PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING, + PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING, PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING, ClusterFormationFailureHelper.DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING, ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/discovery/PeerFinder.java b/server/src/main/java/org/opensearch/discovery/PeerFinder.java index a601a6fbe4d82..e8b6c72c512a2 100644 --- a/server/src/main/java/org/opensearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/opensearch/discovery/PeerFinder.java @@ -84,6 +84,14 @@ public abstract class PeerFinder { Setting.Property.NodeScope ); + // the time between attempts to find all peers when node is in decommissioned state, default set to 2 minutes + public static final Setting DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING = Setting.timeSetting( + "discovery.find_peers_interval_during_decommission", + TimeValue.timeValueSeconds(120L), + TimeValue.timeValueMillis(1000), + Setting.Property.NodeScope + ); + public static final Setting DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING = Setting.timeSetting( "discovery.request_peers_timeout", TimeValue.timeValueMillis(3000), @@ -91,7 +99,8 @@ public abstract class PeerFinder { Setting.Property.NodeScope ); - private final TimeValue findPeersInterval; + private final Settings settings; + private TimeValue findPeersInterval; private final TimeValue requestPeersTimeout; private final Object mutex = new Object(); @@ -112,6 +121,7 @@ public PeerFinder( TransportAddressConnector transportAddressConnector, ConfiguredHostsResolver configuredHostsResolver ) { + this.settings = settings; findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings); requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings); this.transportService = transportService; @@ -128,6 +138,23 @@ public PeerFinder( ); } + public synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) { + findPeersInterval = localNodeCommissioned + ? DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings) + : DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING.get(settings); + logger.info( + "setting findPeersInterval to [{}] as node commission status = [{}] for local node [{}]", + findPeersInterval, + localNodeCommissioned, + transportService.getLocalNode() + ); + } + + // package private for tests + TimeValue getFindPeersInterval() { + return findPeersInterval; + } + public void activate(final DiscoveryNodes lastAcceptedNodes) { logger.trace("activating with {}", lastAcceptedNodes); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java index d96c972bc6021..74c5d0fcccbed 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java @@ -87,6 +87,7 @@ import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_SETTING; import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_WRITES; import static org.opensearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION; +import static org.opensearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING; import static org.opensearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING; import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; @@ -1780,6 +1781,48 @@ public void testImproveConfigurationPerformsVotingConfigExclusionStateCheck() { } } + public void testLocalNodeAlwaysCommissionedWithoutDecommissionedException() { + try (Cluster cluster = new Cluster(randomIntBetween(1, 5))) { + cluster.runRandomly(); + cluster.stabilise(); + for (ClusterNode node : cluster.clusterNodes) { + assertTrue(node.coordinator.localNodeCommissioned()); + } + } + } + + public void testClusterStabilisesForPreviouslyDecommissionedNode() { + try (Cluster cluster = new Cluster(randomIntBetween(1, 5))) { + cluster.runRandomly(); + cluster.stabilise(); + for (ClusterNode node : cluster.clusterNodes) { + assertTrue(node.coordinator.localNodeCommissioned()); + } + final ClusterNode leader = cluster.getAnyLeader(); + + ClusterNode decommissionedNode = cluster.new ClusterNode( + nextNodeIndex.getAndIncrement(), true, leader.nodeSettings, () -> new StatusInfo(HEALTHY, "healthy-info") + ); + decommissionedNode.coordinator.onNodeCommissionStatusChange(false); + cluster.clusterNodes.add(decommissionedNode); + + assertFalse(decommissionedNode.coordinator.localNodeCommissioned()); + + cluster.stabilise( + // Interval is updated to decommissioned find peer interval + defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_DURING_DECOMMISSION_SETTING) + // One message delay to send a join + + DEFAULT_DELAY_VARIABILITY + // Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a + // followup reconfiguration + + 3 * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY + ); + + // once cluster stabilises the node joins and would be commissioned + assertTrue(decommissionedNode.coordinator.localNodeCommissioned()); + } + } + private ClusterState buildNewClusterStateWithVotingConfigExclusion( ClusterState currentState, Set newVotingConfigExclusion diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java index a3c945cdbac3a..7b21042b2ed4a 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinHelperTests.java @@ -90,7 +90,8 @@ public void testJoinDeduplication() { startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, - () -> new StatusInfo(HEALTHY, "info") + () -> new StatusInfo(HEALTHY, "info"), + nodeCommissioned -> {} ); transportService.start(); @@ -230,7 +231,8 @@ private void assertJoinValidationRejectsMismatchedClusterUUID(String actionName, startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, - null + null, + nodeCommissioned -> {} ); // registers request handler transportService.start(); transportService.acceptIncomingRequests(); @@ -284,7 +286,8 @@ public void testJoinFailureOnUnhealthyNodes() { startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, - () -> nodeHealthServiceStatus.get() + () -> nodeHealthServiceStatus.get(), + nodeCommissioned -> {} ); transportService.start(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index 52922bc6a0e83..66a3b00f2979d 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -261,6 +261,51 @@ public void testJoinClusterWithDifferentDecommission() { JoinTaskExecutor.ensureNodeCommissioned(discoveryNode, metadata); } + public void testJoinFailedForDecommissionedNode() throws Exception { + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, rerouteService, null); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone1"); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + DecommissionStatus.SUCCESSFUL + ); + final ClusterState clusterManagerClusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(clusterManagerNode) + .localNodeId(clusterManagerNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + ) + .metadata(Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata)) + .build(); + + final DiscoveryNode decommissionedNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + Collections.singletonMap("zone", "zone1"), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + String decommissionedNodeID = decommissionedNode.getId(); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterManagerClusterState, + List.of(new JoinTaskExecutor.Task(decommissionedNode, "test")) + ); + assertThat(result.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertFalse(taskResult.isSuccess()); + assertTrue(taskResult.getFailure() instanceof NodeDecommissionedException); + assertFalse(result.resultingState.getNodes().nodeExists(decommissionedNodeID)); + } + public void testJoinClusterWithDecommissionFailed() { Settings.builder().build(); DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1"); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java index c77baba5fe167..18a7b892a424c 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NodeJoinTests.java @@ -39,6 +39,10 @@ import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionStatus; +import org.opensearch.cluster.decommission.NodeDecommissionedException; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; @@ -775,6 +779,60 @@ public void testJoinElectedLeaderWithDeprecatedMasterRole() { assertTrue(clusterStateHasNode(node1)); } + public void testJoinFailsWhenDecommissioned() { + DiscoveryNode node0 = newNode(0, true); + DiscoveryNode node1 = newNode(1, true); + long initialTerm = randomLongBetween(1, 10); + long initialVersion = randomLongBetween(1, 10); + setupFakeClusterManagerServiceAndCoordinator( + initialTerm, + initialStateWithDecommissionedAttribute( + initialState(node0, initialTerm, initialVersion, VotingConfiguration.of(node0)), + new DecommissionAttribute("zone", "zone1") + ), + () -> new StatusInfo(HEALTHY, "healthy-info") + ); + assertFalse(isLocalNodeElectedMaster()); + long newTerm = initialTerm + randomLongBetween(1, 10); + joinNodeAndRun(new JoinRequest(node0, newTerm, Optional.of(new Join(node0, node0, newTerm, initialTerm, initialVersion)))); + assertTrue(isLocalNodeElectedMaster()); + assertFalse(clusterStateHasNode(node1)); + joinNodeAndRun(new JoinRequest(node1, newTerm, Optional.of(new Join(node1, node0, newTerm, initialTerm, initialVersion)))); + assertTrue(isLocalNodeElectedMaster()); + assertTrue(clusterStateHasNode(node1)); + DiscoveryNode decommissionedNode = new DiscoveryNode( + "data_2", + 2 + "", + buildNewFakeTransportAddress(), + Collections.singletonMap("zone", "zone1"), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + long anotherTerm = newTerm + randomLongBetween(1, 10); + + assertThat( + expectThrows( + NodeDecommissionedException.class, + () -> joinNodeAndRun(new JoinRequest(decommissionedNode, anotherTerm, Optional.empty())) + ).getMessage(), + containsString("with current status of decommissioning") + ); + assertFalse(clusterStateHasNode(decommissionedNode)); + + DiscoveryNode node3 = new DiscoveryNode( + "data_3", + 3 + "", + buildNewFakeTransportAddress(), + Collections.singletonMap("zone", "zone2"), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + long termForNode3 = anotherTerm + randomLongBetween(1, 10); + + joinNodeAndRun(new JoinRequest(node3, termForNode3, Optional.empty())); + assertTrue(clusterStateHasNode(node3)); + } + private boolean isLocalNodeElectedMaster() { return MasterServiceTests.discoveryState(clusterManagerService).nodes().isLocalNodeElectedMaster(); } @@ -782,4 +840,17 @@ private boolean isLocalNodeElectedMaster() { private boolean clusterStateHasNode(DiscoveryNode node) { return node.equals(MasterServiceTests.discoveryState(clusterManagerService).nodes().get(node.getId())); } + + private static ClusterState initialStateWithDecommissionedAttribute( + ClusterState clusterState, + DecommissionAttribute decommissionAttribute + ) { + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + DecommissionStatus.SUCCESSFUL + ); + return ClusterState.builder(clusterState) + .metadata(Metadata.builder(clusterState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) + .build(); + } } diff --git a/server/src/test/java/org/opensearch/discovery/PeerFinderTests.java b/server/src/test/java/org/opensearch/discovery/PeerFinderTests.java index 5e7dede0309c6..7e7bb2f0a2911 100644 --- a/server/src/test/java/org/opensearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/opensearch/discovery/PeerFinderTests.java @@ -807,6 +807,42 @@ public void testReconnectsToDisconnectedNodes() { assertFoundPeers(rebootedOtherNode); } + public void testConnectionAttemptDuringDecommissioning() { + boolean localNodeCommissioned = randomBoolean(); + peerFinder.onNodeCommissionStatusChange(localNodeCommissioned); + + long findPeersInterval = peerFinder.getFindPeersInterval().millis(); + + final DiscoveryNode otherNode = newDiscoveryNode("node-1"); + providedAddresses.add(otherNode.getAddress()); + transportAddressConnector.addReachableNode(otherNode); + + peerFinder.activate(lastAcceptedNodes); + runAllRunnableTasks(); + assertFoundPeers(otherNode); + + transportAddressConnector.reachableNodes.clear(); + final DiscoveryNode newNode = new DiscoveryNode("new-node", otherNode.getAddress(), Version.CURRENT); + transportAddressConnector.addReachableNode(newNode); + + connectedNodes.remove(otherNode); + disconnectedNodes.add(otherNode); + + // peer discovery will be delayed now + if (localNodeCommissioned == false) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + assertPeersNotDiscovered(newNode); + } + + final long expectedTime = CONNECTION_TIMEOUT_MILLIS + findPeersInterval; + while (deterministicTaskQueue.getCurrentTimeMillis() < expectedTime) { + deterministicTaskQueue.advanceTime(); + runAllRunnableTasks(); + } + assertFoundPeers(newNode); + } + private void respondToRequests(Function responseFactory) { final CapturedRequest[] capturedRequests = capturingTransport.getCapturedRequestsAndClear(); for (final CapturedRequest capturedRequest : capturedRequests) { @@ -828,6 +864,16 @@ private void assertFoundPeers(DiscoveryNode... expectedNodesArray) { assertNotifiedOfAllUpdates(); } + private void assertPeersNotDiscovered(DiscoveryNode... undiscoveredNodesArray) { + final Set undiscoveredNodes = Arrays.stream(undiscoveredNodesArray).collect(Collectors.toSet()); + final List actualNodesList = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false) + .collect(Collectors.toList()); + final HashSet actualNodesSet = new HashSet<>(actualNodesList); + Set intersection = new HashSet<>(actualNodesSet); + intersection.retainAll(undiscoveredNodes); + assertEquals(intersection.size(), 0); + } + private void assertNotifiedOfAllUpdates() { final Stream actualNodes = StreamSupport.stream(peerFinder.getFoundPeers().spliterator(), false); final Stream notifiedNodes = StreamSupport.stream(foundPeersFromNotification.spliterator(), false);