From cdc7a2fbe8c06a0ab11b35bd2142ca3ecdc0a0ba Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 18 Oct 2022 12:33:43 +0530 Subject: [PATCH] Fix decommission status update to non leader nodes (#4800) * Fix decommission status update to non leader nodes Signed-off-by: Rishab Nahata --- CHANGELOG.md | 2 + .../AwarenessAttributeDecommissionIT.java | 164 ++++++++++++++++++ .../DecommissionAttributeMetadata.java | 21 +-- .../decommission/DecommissionController.java | 8 +- 4 files changed, 179 insertions(+), 16 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f6e5425f37db..bc22e66fe11ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -138,7 +138,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fixing Gradle warnings associated with publishPluginZipPublicationToXxx tasks ([#4696](https://github.com/opensearch-project/OpenSearch/pull/4696)) - Fixed randomly failing test ([4774](https://github.com/opensearch-project/OpenSearch/pull/4774)) - Update version check after backport ([4786](https://github.com/opensearch-project/OpenSearch/pull/4786)) +- Fix decommission status update to non leader nodes ([4800](https://github.com/opensearch-project/OpenSearch/pull/4800)) - Fix recovery path for searchable snapshots ([4813](https://github.com/opensearch-project/OpenSearch/pull/4813)) + ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java new file mode 100644 index 0000000000000..b8318503ee4a5 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -0,0 +1,164 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateAction; +import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateRequest; +import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateAction; +import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateRequest; +import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionStatus; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.opensearch.test.NodeRoles.onlyRole; +import static org.opensearch.test.OpenSearchIntegTestCase.client; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class AwarenessAttributeDecommissionIT extends OpenSearchIntegTestCase { + private final Logger logger = LogManager.getLogger(AwarenessAttributeDecommissionIT.class); + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(MockTransportService.TestPlugin.class); + } + + @After + public void cleanup() throws Exception { + assertNoTimeout(client().admin().cluster().prepareHealth().get()); + } + + public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionException, InterruptedException { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'"); + List clusterManagerNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + .build() + ); + + logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'"); + List dataNodes = internalCluster().startNodes( + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "a") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "b") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build(), + Settings.builder() + .put(commonSettings) + .put("node.attr.zone", "c") + .put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE)) + .build() + ); + + ensureStableCluster(6); + + logger.info("--> starting decommissioning nodes in zone {}", 'c'); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); + DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); + assertTrue(decommissionResponse.isAcknowledged()); + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + // assert that decommission status is successful + GetDecommissionStateResponse response = client().execute(GetDecommissionStateAction.INSTANCE, new GetDecommissionStateRequest()) + .get(); + assertEquals(response.getDecommissionedAttribute(), decommissionAttribute); + assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL); + + ClusterState clusterState = client(clusterManagerNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState(); + assertEquals(4, clusterState.nodes().getSize()); + + // assert status on nodes that are part of cluster currently + Iterator discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt(); + while (discoveryNodeIterator.hasNext()) { + // assert no node has decommissioned attribute + DiscoveryNode node = discoveryNodeIterator.next(); + assertNotEquals(node.getAttributes().get("zone"), "c"); + + // assert all the nodes has status as SUCCESSFUL + ClusterService localNodeClusterService = internalCluster().getInstance(ClusterService.class, node.getName()); + assertEquals( + localNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), + DecommissionStatus.SUCCESSFUL + ); + } + + // assert status on decommissioned node + // Here we will verify that until it got kicked out, it received appropriate status updates + // decommissioned nodes hence will have status as IN_PROGRESS as it will be kicked out later after this + // and won't receive status update to SUCCESSFUL + String randomDecommissionedNode = randomFrom(clusterManagerNodes.get(2), dataNodes.get(2)); + ClusterService decommissionedNodeClusterService = internalCluster().getInstance(ClusterService.class, randomDecommissionedNode); + assertEquals( + decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), + DecommissionStatus.IN_PROGRESS + ); + + // Will wait for all events to complete + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + + // Recommissioning the zone back to gracefully succeed the test once above tests succeeds + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodes.get(0)).execute( + DeleteDecommissionStateAction.INSTANCE, + new DeleteDecommissionStateRequest() + ).get(); + assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + + // will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes) + // as by then all nodes should have joined the cluster + ensureStableCluster(6, TimeValue.timeValueMinutes(2)); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java index dbb3fea823eb6..d3d508bf36451 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java @@ -79,35 +79,29 @@ public DecommissionStatus status() { /** * Returns instance of the metadata with updated status * @param newStatus status to be updated with - * @return instance with valid status */ // synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe - public synchronized DecommissionAttributeMetadata setUpdatedStatus(DecommissionStatus newStatus) { - // if the current status is the expected status already, we return the same instance - if (newStatus.equals(status)) { - return this; + public synchronized void validateNewStatus(DecommissionStatus newStatus) { + // if the current status is the expected status already or new status is FAILED, we let the check pass + if (newStatus.equals(status) || newStatus.equals(DecommissionStatus.FAILED)) { + return; } // We don't expect that INIT will be new status, as it is registered only when starting the decommission action switch (newStatus) { case IN_PROGRESS: - validateAndSetStatus(DecommissionStatus.INIT, newStatus); + validateStatus(DecommissionStatus.INIT, newStatus); break; case SUCCESSFUL: - validateAndSetStatus(DecommissionStatus.IN_PROGRESS, newStatus); - break; - case FAILED: - // we don't need to validate here and directly update status to FAILED - this.status = newStatus; + validateStatus(DecommissionStatus.IN_PROGRESS, newStatus); break; default: throw new IllegalArgumentException( "illegal decommission status [" + newStatus.status() + "] requested for updating metadata" ); } - return this; } - private void validateAndSetStatus(DecommissionStatus expected, DecommissionStatus next) { + private void validateStatus(DecommissionStatus expected, DecommissionStatus next) { if (status.equals(expected) == false) { assert false : "can't move decommission status to [" + next @@ -120,7 +114,6 @@ private void validateAndSetStatus(DecommissionStatus expected, DecommissionStatu "can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])" ); } - status = next; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 7719012f2f3d7..b58d99a9d59db 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -246,8 +246,12 @@ public ClusterState execute(ClusterState currentState) { decommissionAttributeMetadata.status(), decommissionStatus ); - // setUpdatedStatus can throw IllegalStateException if the sequence of update is not valid - decommissionAttributeMetadata.setUpdatedStatus(decommissionStatus); + // validateNewStatus can throw IllegalStateException if the sequence of update is not valid + decommissionAttributeMetadata.validateNewStatus(decommissionStatus); + decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttributeMetadata.decommissionAttribute(), + decommissionStatus + ); return ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) .build();