Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remote state publication transport call #13835

Merged
merged 7 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;

import java.io.Closeable;
Expand All @@ -52,6 +53,7 @@
import java.util.Set;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -79,6 +81,7 @@ public class CoordinationState {
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;
private final boolean isRemoteStateEnabled;
private final boolean isRemotePublicationEnabled;

public CoordinationState(
DiscoveryNode localNode,
Expand All @@ -102,6 +105,12 @@ public CoordinationState(
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
this.isRemotePublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL)
&& localNode.isRemoteStatePublicationEnabled();
}

public boolean isRemotePublicationEnabled() {
return isRemotePublicationEnabled;
}

public long getCurrentTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.opensearch.discovery.PeerFinder;
import org.opensearch.discovery.SeedHostsProvider;
import org.opensearch.discovery.SeedHostsResolver;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
Expand Down Expand Up @@ -209,7 +210,8 @@ public Coordinator(
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService,
ClusterManagerMetrics clusterManagerMetrics
ClusterManagerMetrics clusterManagerMetrics,
RemoteClusterStateService remoteClusterStateService
soosinha marked this conversation as resolved.
Show resolved Hide resolved
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -261,7 +263,8 @@ public Coordinator(
transportService,
namedWriteableRegistry,
this::handlePublishRequest,
this::handleApplyCommit
this::handleApplyCommit,
remoteClusterStateService
);
this.leaderChecker = new LeaderChecker(
settings,
Expand Down Expand Up @@ -1330,7 +1333,9 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
+ clusterState;

final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
clusterChangedEvent
clusterChangedEvent,
coordinationState.get().isRemotePublicationEnabled(),
persistedStateRegistry
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
);

final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.IncompatibleClusterStateVersionException;
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.gateway.GatewayMetaState.RemotePersistedState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.BytesTransportRequest;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -74,6 +78,7 @@ public class PublicationTransportHandler {
private static final Logger logger = LogManager.getLogger(PublicationTransportHandler.class);

public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state";
public static final String PUBLISH_REMOTE_STATE_ACTION_NAME = "internal:cluster/coordination/publish_remote_state";
public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state";

private final TransportService transportService;
Expand All @@ -97,16 +102,19 @@ public class PublicationTransportHandler {
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE)
.build();
private final RemoteClusterStateService remoteClusterStateService;

public PublicationTransportHandler(
TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit,
RemoteClusterStateService remoteClusterStateService
) {
this.transportService = transportService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.handlePublishRequest = handlePublishRequest;
this.remoteClusterStateService = remoteClusterStateService;

transportService.registerRequestHandler(
PUBLISH_STATE_ACTION_NAME,
Expand All @@ -117,6 +125,15 @@ public PublicationTransportHandler(
(request, channel, task) -> channel.sendResponse(handleIncomingPublishRequest(request))
);

transportService.registerRequestHandler(
PUBLISH_REMOTE_STATE_ACTION_NAME,
ThreadPool.Names.GENERIC,
false,
false,
RemotePublishRequest::new,
(request, channel, task) -> channel.sendResponse(handleIncomingRemotePublishRequest(request))
);

transportService.registerRequestHandler(
COMMIT_STATE_ACTION_NAME,
ThreadPool.Names.GENERIC,
Expand Down Expand Up @@ -211,6 +228,74 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}
}

// package private for testing
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
}
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
request.getClusterUUID(),
request.getManifestFile()
);
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.trace(() -> "There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug(() -> "Last cluster state not compatible with the diff");
applyFullState = true;
}

if (applyFullState == true) {
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
soosinha marked this conversation as resolved.
Show resolved Hide resolved
request.getClusterName(),
manifest,
transportService.getLocalNode().getId(),
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getDiffManifest().getFromStateUUID(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
request.getClusterName(),
manifest,
lastSeen,
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
}

private PublishWithJoinResponse acceptState(ClusterState incomingState) {
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
Expand All @@ -224,8 +309,35 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
return handlePublishRequest.apply(new PublishRequest(incomingState));
}

public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent);
private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishRequest remotePublishRequest) {
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
if (publishRequest == null
|| publishRequest.getAcceptedState().coordinationMetadata().term() != remotePublishRequest.term
|| publishRequest.getAcceptedState().version() != remotePublishRequest.version) {
logger.debug(
() -> new ParameterizedMessage(
"Publication failure for current publish request : {} and remote publish request: {}",
publishRequest,
remotePublishRequest
)
);
throw new IllegalStateException("publication to self failed for " + remotePublishRequest);
}
PublishWithJoinResponse publishWithJoinResponse = handlePublishRequest.apply(publishRequest);
lastSeenClusterState.set(publishRequest.getAcceptedState());
return publishWithJoinResponse;
}

public PublicationContext newPublicationContext(
ClusterChangedEvent clusterChangedEvent,
boolean isRemotePublicationEnabled,
PersistedStateRegistry persistedStateRegistry
) {
final PublicationContext publicationContext = new PublicationContext(
clusterChangedEvent,
isRemotePublicationEnabled,
persistedStateRegistry
);

// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
Expand All @@ -234,6 +346,16 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang
return publicationContext;
}

// package private for testing
void setCurrentPublishRequestToSelf(PublishRequest publishRequest) {
this.currentPublishRequestToSelf.set(publishRequest);
}

// package private for testing
void setLastSeenClusterState(ClusterState clusterState) {
this.lastSeenClusterState.set(clusterState);
}

private static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
final BytesReference serializedState = CompressedStreamUtils.createCompressedStream(nodeVersion, stream -> {
stream.writeBoolean(true);
Expand Down Expand Up @@ -270,12 +392,20 @@ public class PublicationContext {
private final boolean sendFullVersion;
private final Map<Version, BytesReference> serializedStates = new HashMap<>();
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
private final boolean sendRemoteState;
private final PersistedStateRegistry persistedStateRegistry;

PublicationContext(ClusterChangedEvent clusterChangedEvent) {
PublicationContext(
ClusterChangedEvent clusterChangedEvent,
boolean isRemotePublicationEnabled,
PersistedStateRegistry persistedStateRegistry
) {
discoveryNodes = clusterChangedEvent.state().nodes();
newState = clusterChangedEvent.state();
previousState = clusterChangedEvent.previousState();
sendFullVersion = previousState.getBlocks().disableStatePersistence();
sendRemoteState = isRemotePublicationEnabled;
this.persistedStateRegistry = persistedStateRegistry;
}

void buildDiffAndSerializeStates() {
Expand Down Expand Up @@ -339,7 +469,11 @@ public void onFailure(Exception e) {
} else {
responseActionListener = listener;
}
if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
// TODO Decide to send remote state before starting publication by checking remote publication on all nodes
if (sendRemoteState && destination.isRemoteStatePublicationEnabled()) {
soosinha marked this conversation as resolved.
Show resolved Hide resolved
logger.trace("sending remote cluster state version [{}] to [{}]", newState.version(), destination);
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
} else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
sendFullClusterState(destination, responseActionListener);
} else {
Expand Down Expand Up @@ -384,6 +518,61 @@ public String executor() {
);
}

private void sendRemoteClusterState(
final DiscoveryNode destination,
final ClusterState clusterState,
final ActionListener<PublishWithJoinResponse> listener
) {
try {
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.getLastUploadedManifestFile();
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(
discoveryNodes.getLocalNode(),
clusterState.term(),
clusterState.getVersion(),
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID(),
manifestFileName
);
final Consumer<TransportException> transportExceptionHandler = exp -> {
logger.debug(() -> new ParameterizedMessage("failed to send remote cluster state to {}", destination), exp);
listener.onFailure(exp);
};
final TransportResponseHandler<PublishWithJoinResponse> responseHandler = new TransportResponseHandler<>() {

@Override
public PublishWithJoinResponse read(StreamInput in) throws IOException {
return new PublishWithJoinResponse(in);
}

@Override
public void handleResponse(PublishWithJoinResponse response) {
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
transportExceptionHandler.accept(exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}
};
transportService.sendRequest(
destination,
PUBLISH_REMOTE_STATE_ACTION_NAME,
remotePublishRequest,
stateRequestOptions,
responseHandler
);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e);
soosinha marked this conversation as resolved.
Show resolved Hide resolved
listener.onFailure(e);
}
}

private void sendFullClusterState(DiscoveryNode destination, ActionListener<PublishWithJoinResponse> listener) {
BytesReference bytes = serializedStates.get(destination.getVersion());
if (bytes == null) {
Expand Down
Loading
Loading