diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index f20ea13e7668a..14200b231c309 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -40,6 +40,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import java.io.Closeable; import java.io.IOException; @@ -50,10 +51,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -583,7 +582,7 @@ public void handlePrePublish(ClusterState clusterState) { // This is to ensure the remote store is the single source of truth for current state. Even if the current node // goes down after sending the cluster state to other nodes, we should be able to read the remote state and // recover the cluster. - if (isRemotePublicationEnabled) { + if (isRemoteStateEnabled) { assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized"; persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState); } @@ -594,7 +593,7 @@ public void handlePrePublish(ClusterState clusterState) { */ public void handlePreCommit() { // Publishing the committed state to remote store before sending apply commit to other nodes. - if (isRemotePublicationEnabled) { + if (isRemoteStateEnabled) { assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null : "Remote state has not been initialized"; persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted(); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 914db31886850..87f02c6891be6 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -49,7 +49,6 @@ import org.opensearch.cluster.coordination.CoordinationState.VoteCollection; import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.opensearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; -import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 0aea421ca1dc3..f4e4241a8ceae 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -31,9 +31,6 @@ package org.opensearch.cluster.coordination; -import java.util.Locale; -import java.util.Optional; -import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -43,7 +40,6 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.Diff; import org.opensearch.cluster.IncompatibleClusterStateVersionException; -import org.opensearch.cluster.coordination.CoordinationState.PersistedState; import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -65,9 +61,7 @@ import java.io.IOException; import java.util.HashMap; -import java.util.Locale; import java.util.Map; -import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -238,7 +232,10 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish if (transportService.getLocalNode().equals(request.getSourceNode())) { return acceptStateOnLocalNode(request); } - ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(request.getClusterUUID(), request.getManifestFile()); + ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName( + request.getClusterUUID(), + request.getManifestFile() + ); boolean applyFullState = false; final ClusterState lastSeen = lastSeenClusterState.get(); if (lastSeen == null) { @@ -253,17 +250,40 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish } if (applyFullState == true) { - logger.debug(() -> new ParameterizedMessage("Downloading full cluster state for term {}, version {}, stateUUID {}", manifest.getClusterTerm(), manifest.getStateVersion(), - manifest.getStateUUID())); - ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId(), true); + logger.debug( + () -> new ParameterizedMessage( + "Downloading full cluster state for term {}, version {}, stateUUID {}", + manifest.getClusterTerm(), + manifest.getStateVersion(), + manifest.getStateUUID() + ) + ); + ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest( + request.getClusterName(), + manifest, + transportService.getLocalNode().getId(), + true + ); fullClusterStateReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.set(clusterState); return response; } else { - logger.debug(() -> new ParameterizedMessage("Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", manifest.getClusterTerm(), - manifest.getStateVersion(), manifest.getDiffManifest().getFromStateUUID(), manifest.getStateUUID())); - ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeen, transportService.getLocalNode().getId()); + logger.debug( + () -> new ParameterizedMessage( + "Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}", + manifest.getClusterTerm(), + manifest.getStateVersion(), + manifest.getDiffManifest().getFromStateUUID(), + manifest.getStateUUID() + ) + ); + ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff( + request.getClusterName(), + manifest, + lastSeen, + transportService.getLocalNode().getId() + ); compatibleClusterStateDiffReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.compareAndSet(lastSeen, clusterState); @@ -286,7 +306,8 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) { private PublishWithJoinResponse acceptStateOnLocalNode(RemotePublishRequest remotePublishRequest) { final PublishRequest publishRequest = currentPublishRequestToSelf.get(); - if (publishRequest == null || publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term + if (publishRequest == null + || publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term || publishRequest.getAcceptedState().version() != remotePublishRequest.version) { throw new IllegalStateException("publication to self failed for " + remotePublishRequest); } @@ -295,9 +316,16 @@ private PublishWithJoinResponse acceptStateOnLocalNode(RemotePublishRequest remo return publishWithJoinResponse; } - public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, - PersistedStateRegistry persistedStateRegistry) { - final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled, persistedStateRegistry); + public PublicationContext newPublicationContext( + ClusterChangedEvent clusterChangedEvent, + boolean isRemotePublicationEnabled, + PersistedStateRegistry persistedStateRegistry + ) { + final PublicationContext publicationContext = new PublicationContext( + clusterChangedEvent, + isRemotePublicationEnabled, + persistedStateRegistry + ); // Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication // straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and @@ -345,7 +373,11 @@ public class PublicationContext { private final boolean sendRemoteState; private final PersistedStateRegistry persistedStateRegistry; - PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, PersistedStateRegistry persistedStateRegistry) { + PublicationContext( + ClusterChangedEvent clusterChangedEvent, + boolean isRemotePublicationEnabled, + PersistedStateRegistry persistedStateRegistry + ) { discoveryNodes = clusterChangedEvent.state().nodes(); newState = clusterChangedEvent.state(); previousState = clusterChangedEvent.previousState(); @@ -462,11 +494,22 @@ public String executor() { ); } - private void sendRemoteClusterState(final DiscoveryNode destination, final ClusterState clusterState, final ActionListener listener) { + private void sendRemoteClusterState( + final DiscoveryNode destination, + final ClusterState clusterState, + final ActionListener listener + ) { try { - final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)).getLastUploadedManifestFile(); - final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(discoveryNodes.getLocalNode(), clusterState.term(), - clusterState.getVersion(), clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifestFileName); + final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)) + .getLastUploadedManifestFile(); + final RemotePublishRequest remotePublishRequest = new RemotePublishRequest( + discoveryNodes.getLocalNode(), + clusterState.term(), + clusterState.getVersion(), + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID(), + manifestFileName + ); final Consumer transportExceptionHandler = exp -> { logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp); listener.onFailure(exp); @@ -493,7 +536,13 @@ public String executor() { return ThreadPool.Names.GENERIC; } }; - transportService.sendRequest(destination, PUBLISH_REMOTE_STATE_ACTION_NAME, remotePublishRequest, stateRequestOptions, responseHandler); + transportService.sendRequest( + destination, + PUBLISH_REMOTE_STATE_ACTION_NAME, + remotePublishRequest, + stateRequestOptions, + responseHandler + ); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e); listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java index 68dea53372c45..9461c5ee63627 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/RemotePublishRequest.java @@ -14,13 +14,24 @@ import java.io.IOException; +/** + * Send the publish request with the remote cluster state details + * @opensearch.internal + */ public class RemotePublishRequest extends TermVersionRequest { private final String clusterName; private final String clusterUUID; private final String manifestFile; - public RemotePublishRequest(DiscoveryNode sourceNode, long term, long version, String clusterName, String clusterUUID, String manifestFile) { + public RemotePublishRequest( + DiscoveryNode sourceNode, + long term, + long version, + String clusterName, + String clusterUUID, + String manifestFile + ) { super(sourceNode, term, version); this.clusterName = clusterName; this.clusterUUID = clusterUUID; @@ -44,8 +55,20 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - return "RemotePublishRequest{" + "term=" + term + ", version=" + version + ", clusterName=" + clusterName + ", clusterUUID=" + clusterUUID - + ", sourceNode=" + sourceNode + ", manifestFile=" + manifestFile + '}'; + return "RemotePublishRequest{" + + "term=" + + term + + ", version=" + + version + + ", clusterName=" + + clusterName + + ", clusterUUID=" + + clusterUUID + + ", sourceNode=" + + sourceNode + + ", manifestFile=" + + manifestFile + + '}'; } public String getClusterName() { diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 6cf1f654e3939..f322106f2fe3b 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -33,7 +33,6 @@ package org.opensearch.cluster.node; import org.opensearch.Version; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.UUIDs; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.settings.Setting; @@ -44,7 +43,6 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.node.Node; import java.io.IOException; diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 3ba6cebbe7350..09f32884e0ae1 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -77,7 +77,6 @@ import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; -import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; @@ -716,6 +715,9 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, + RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, + RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, + RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 1cc0c9cbb6642..cb3325f7462b1 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -722,9 +722,14 @@ public void setLastAcceptedState(ClusterState clusterState) { } else { assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true : "Previous manifest and previous ClusterState are not in sync"; - manifestDetails = remoteClusterStateService.writeIncrementalMetadata(lastAcceptedState, clusterState, lastAcceptedManifest); + manifestDetails = remoteClusterStateService.writeIncrementalMetadata( + lastAcceptedState, + clusterState, + lastAcceptedManifest + ); } - assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(), clusterState) == true : "Manifest and ClusterState are not in sync"; + assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(), clusterState) == true + : "Manifest and ClusterState are not in sync"; lastAcceptedManifest = manifestDetails.getClusterMetadataManifest(); lastAcceptedState = clusterState; lastUploadedManifestFile = manifestDetails.getManifestFileName(); diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index b3b1bf37f8696..d82ba0b859f10 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -332,6 +332,11 @@ public Map getCustomMetadataMap() { return uploadedCustomMetadataMap; } + // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 + public ClusterStateDiffManifest getDiffManifest() { + return new ClusterStateDiffManifest(); + } + public boolean hasMetadataAttributesFiles() { return uploadedCoordinationMetadata != null || uploadedSettingsMetadata != null @@ -991,4 +996,11 @@ public String toString() { + '}'; } } + + // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 + public static class ClusterStateDiffManifest { + public String getFromStateUUID() { + return null; + } + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 798553b72fef0..b59f475b90d6d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -1092,6 +1092,31 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID .build(); } + public ClusterState getClusterStateForManifest( + String clusterName, + ClusterMetadataManifest manifest, + String localNodeId, + boolean includeEphemeral + ) { + // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 + return null; + } + + public ClusterState getClusterStateUsingDiff( + String clusterName, + ClusterMetadataManifest manifest, + ClusterState previousClusterState, + String localNodeId + ) { + // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 + return null; + } + + public ClusterMetadataManifest getClusterMetadataManifestByFileName(String clusterUUID, String manifestFileName) { + // TODO https://github.com/opensearch-project/OpenSearch/pull/14089 + return null; + } + private Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { String globalMetadataFileName = clusterMetadataManifest.getGlobalMetadataFileName(); try { diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index 05714b4791d06..46d1d07db7eb6 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -202,7 +202,8 @@ private static boolean isRemoteRoutingTableAttributePresent(Settings settings) { } public static boolean isRemotePublicationEnabled(Settings settings) { - return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings) + return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) + && isRemoteRoutingTableAttributePresent(settings) && isRemoteStoreClusterStateEnabled(settings); } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 4ade19b3d7219..be9cec8ef5a25 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -979,6 +979,8 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep Mockito.verify(remoteClusterStateService, Mockito.times(1)).writeFullMetadata(clusterState, previousClusterUUID); assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState(), equalTo(clusterState)); + Mockito.when(remoteClusterStateService.markLastStateAsCommitted(any(), any())) + .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); coordinationState.handlePreCommit(); ClusterState committedClusterState = ClusterState.builder(clusterState) .metadata(Metadata.builder(clusterState.metadata()).clusterUUIDCommitted(true).build()) diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 1d63caafbb4dd..16c6c38e4524f 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -724,7 +724,8 @@ public void testRemotePersistedState() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(1L).stateVersion(5L).build(); final String previousClusterUUID = "prev-cluster-uuid"; - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())) + .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); @@ -756,6 +757,9 @@ public void testRemotePersistedState() throws IOException { assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); + when(remoteClusterStateService.markLastStateAsCommitted(Mockito.any(), Mockito.any())).thenReturn( + new RemoteUploadDetails(manifest, "path/to/manifest") + ); remotePersistedState.markLastAcceptedStateAsCommitted(); Mockito.verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(Mockito.any(), Mockito.any()); @@ -781,7 +785,8 @@ public void testRemotePersistedStateNotCommitted() throws IOException { .build(); Mockito.when(remoteClusterStateService.getLatestClusterMetadataManifest(Mockito.any(), Mockito.any())) .thenReturn(Optional.of(manifest)); - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())).thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())) + .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) .thenReturn(new RemoteUploadDetails(manifest, "path/to/manifest")); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 4a53770c76d88..b4a7cb439cfb5 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -43,6 +43,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.model.RemoteUploadDetails; import org.opensearch.index.remote.RemoteIndexPathUploader; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.indices.IndicesModule; @@ -182,8 +183,8 @@ public void teardown() throws Exception { public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().build(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)); - Assert.assertThat(manifest, nullValue()); + final RemoteUploadDetails manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)); + Assert.assertThat(manifestDetails, nullValue()); } public void testFailInitializationWhenRemoteStateDisabled() { @@ -218,7 +219,8 @@ public void testWriteFullMetadataSuccess() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid"); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -262,7 +264,8 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { }).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid"); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -401,8 +404,8 @@ public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOEx public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().build(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); - Assert.assertThat(manifest, nullValue()); + final RemoteUploadDetails manifestDetails = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); + Assert.assertThat(manifestDetails, nullValue()); assertEquals(0, remoteClusterStateService.getStats().getSuccessCount()); } @@ -433,7 +436,7 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { previousClusterState, clusterState, previousManifest - ); + ).getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); final List indices = List.of(uploadedIndexMetadata); @@ -508,7 +511,7 @@ private void verifyCodecMigrationManifest(int previousCodec) throws IOException previousClusterState, newClusterState, previousManifest - ); + ).getClusterMetadataManifest(); // global metadata is updated assertThat(manifestAfterUpdate.hasMetadataAttributesFiles(), is(true)); @@ -545,7 +548,7 @@ private void verifyWriteIncrementalGlobalMetadataFromOlderCodecSuccess(ClusterMe previousClusterState, clusterState, previousManifest - ); + ).getClusterMetadataManifest(); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .codecVersion(3) @@ -736,7 +739,8 @@ public void testCustomMetadataDeletedUpdatedAndAdded() throws IOException { // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_"); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") + .getClusterMetadataManifest(); ClusterState clusterState1 = ClusterState.builder(initialClusterState) .metadata( @@ -751,7 +755,7 @@ public void testCustomMetadataDeletedUpdatedAndAdded() throws IOException { initialClusterState, clusterState1, initialManifest - ); + ).getClusterMetadataManifest(); // remove custom1 from the cluster state, update custom2, custom3 is at it is, added custom4 ClusterState clusterState2 = ClusterState.builder(initialClusterState) .metadata( @@ -761,7 +765,8 @@ public void testCustomMetadataDeletedUpdatedAndAdded() throws IOException { .putCustom("custom4", new CustomMetadata1("mock_custom_metadata4")) ) .build(); - ClusterMetadataManifest manifest2 = remoteClusterStateService.writeIncrementalMetadata(clusterState1, clusterState2, manifest1); + ClusterMetadataManifest manifest2 = remoteClusterStateService.writeIncrementalMetadata(clusterState1, clusterState2, manifest1) + .getClusterMetadataManifest(); // custom1 is removed assertFalse(manifest2.getCustomMetadataMap().containsKey("custom1")); // custom2 is updated @@ -811,7 +816,8 @@ public void testIndexMetadataDeletedUpdatedAndAdded() throws IOException { // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_"); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") + .getClusterMetadataManifest(); String initialIndex = "test-index"; Index index1 = new Index("test-index-1", "index-uuid-1"); Index index2 = new Index("test-index-2", "index-uuid-2"); @@ -844,7 +850,7 @@ public void testIndexMetadataDeletedUpdatedAndAdded() throws IOException { initialClusterState, clusterState1, initialManifest - ); + ).getClusterMetadataManifest(); // verify that initial index is removed, and new index are added assertEquals(1, initialManifest.getIndices().size()); assertEquals(2, manifest1.getIndices().size()); @@ -855,7 +861,8 @@ public void testIndexMetadataDeletedUpdatedAndAdded() throws IOException { ClusterState clusterState2 = ClusterState.builder(clusterState1) .metadata(Metadata.builder(clusterState1.getMetadata()).put(indexMetadata1, true).build()) .build(); - ClusterMetadataManifest manifest2 = remoteClusterStateService.writeIncrementalMetadata(clusterState1, clusterState2, manifest1); + ClusterMetadataManifest manifest2 = remoteClusterStateService.writeIncrementalMetadata(clusterState1, clusterState2, manifest1) + .getClusterMetadataManifest(); // index1 is updated assertEquals(2, manifest2.getIndices().size()); assertEquals( @@ -888,7 +895,8 @@ private void verifyMetadataAttributeOnlyUpdated( // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_"); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") + .getClusterMetadataManifest(); ClusterState newClusterState = clusterStateUpdater.apply(initialClusterState); @@ -899,9 +907,10 @@ private void verifyMetadataAttributeOnlyUpdated( initialClusterState, newClusterState, initialManifest - ); + ).getClusterMetadataManifest(); } else { - manifestAfterMetadataUpdate = remoteClusterStateService.writeFullMetadata(newClusterState, initialClusterState.stateUUID()); + manifestAfterMetadataUpdate = remoteClusterStateService.writeFullMetadata(newClusterState, initialClusterState.stateUUID()) + .getClusterMetadataManifest(); } assertions.accept(initialManifest, manifestAfterMetadataUpdate); @@ -1182,7 +1191,8 @@ public void testMarkLastStateAsCommittedSuccess() throws IOException { List indices = List.of(uploadedIndexMetadata); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(indices).build(); - final ClusterMetadataManifest manifest = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousManifest); + final ClusterMetadataManifest manifest = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousManifest) + .getClusterMetadataManifest(); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() .indices(indices) @@ -1287,7 +1297,8 @@ public void testRemoteStateStats() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid"); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); assertTrue(remoteClusterStateService.getStats() != null); assertEquals(1, remoteClusterStateService.getStats().getSuccessCount()); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 622507f885814..e6fa149c26b22 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2562,7 +2562,8 @@ public void start(ClusterState initialState) { () -> new StatusInfo(HEALTHY, "healthy-info"), persistedStateRegistry, remoteStoreNodeService, - new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE) + new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE), + null ); clusterManagerService.setClusterStatePublisher(coordinator); coordinator.start();