Skip to content

Commit

Permalink
Unset discovery nodes in all transport node actions request.
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <gupthasg@amazon.com>
  • Loading branch information
Swetha Guptha committed Aug 22, 2024
1 parent 49b7cd4 commit 96657b6
Show file tree
Hide file tree
Showing 12 changed files with 21 additions and 122 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Unset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131))
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))
- [Workload Management] Add queryGroupId to Task ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708))
- Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,8 @@ private boolean isNodeTransportTLSEnabled() {
}

private boolean isNodeLocal(NodesReloadSecureSettingsRequest request) {
if (null == request.concreteNodes()) {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
}
final DiscoveryNode[] nodes = request.concreteNodes();
final DiscoveryNode[] nodes = resolveRequest(request, clusterService.state());
assert nodes != null;
return nodes.length == 1 && nodes[0].getId().equals(clusterService.localNode().getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* */
private DiscoveryNode[] concreteNodes;

/**
* Since do not use the discovery nodes coming from the request in all code paths following a request extended off from
* BaseNodeRequest, we do not require it to sent around across all nodes.
*
* Setting default behavior as `true` but can be explicitly changed in requests that do not require.
*/
private boolean includeDiscoveryNodes = true;
private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);

private TimeValue timeout;
Expand Down Expand Up @@ -127,14 +120,6 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
this.concreteNodes = concreteNodes;
}

public void setIncludeDiscoveryNodes(boolean value) {
includeDiscoveryNodes = value;
}

public boolean getIncludeDiscoveryNodes() {
return includeDiscoveryNodes;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,16 @@ protected NodeResponse nodeOperation(NodeRequest request, Task task) {

/**
* resolve node ids to concrete nodes of the incoming request
**/
protected void resolveRequest(NodesRequest request, ClusterState clusterState) {
*
* @return
*/
protected DiscoveryNode[] resolveRequest(NodesRequest request, ClusterState clusterState) {
if (request.concreteNodes() != null) {
return request.concreteNodes();
}
assert request.concreteNodes() == null : "request concreteNodes shouldn't be set";
String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds());
request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new));
return Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new);
}

/**
Expand Down Expand Up @@ -234,23 +239,16 @@ class AsyncAction {
this.task = task;
this.request = request;
this.listener = listener;
if (request.concreteNodes() == null) {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
this.concreteNodes = request.concreteNodes();

if (request.getIncludeDiscoveryNodes() == false) {
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class, we
// remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// the number of concrete nodes in the memory.
request.setConcreteNodes(null);
}
this.concreteNodes = resolveRequest(request, clusterService.state());
this.responses = new AtomicReferenceArray<>(this.concreteNodes.length);
}

void start() {
final DiscoveryNode[] nodes = this.concreteNodes;
// As we transfer the ownership of discovery nodes to route the request to into the AsyncAction class,
// we remove the list of DiscoveryNodes from the request. This reduces the payload of the request and improves
// the number of concrete nodes in the memory.
request.setConcreteNodes(null);
if (nodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.setIncludeDiscoveryNodes(false);
clusterStatsRequest.useAggregatedNodeLevelResponses(true);
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
nodesInfoRequest.timeout(request.param("timeout"));
settingsFilter.addFilterSettingParams(request);
nodesInfoRequest.setIncludeDiscoveryNodes(false);
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.setIncludeDiscoveryNodes(false);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.setIncludeDiscoveryNodes(false);
nodesInfoRequest.clear()
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
Expand All @@ -138,7 +137,6 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) {
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.setIncludeDiscoveryNodes(false);
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,48 +33,12 @@

public class TransportClusterStatsActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testClusterStatsActionWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(true);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

public void testClusterStatsActionWithPreFilledConcreteNodesAndWithRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
* asserted in this test.
*/
public void testClusterStatsActionWithoutRetentionOfDiscoveryNodesList() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand All @@ -88,7 +52,6 @@ public void testClusterStatsActionWithPreFilledConcreteNodesAndWithoutRetentionO
ClusterStatsRequest request = new ClusterStatsRequest();
Collection<DiscoveryNode> discoveryNodes = clusterService.state().getNodes().getNodes().values();
request.setConcreteNodes(discoveryNodes.toArray(DiscoveryNode[]::new));
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ private static class DataNodesOnlyTransportNodesAction extends TestTransportNode
}

@Override
protected void resolveRequest(TestNodesRequest request, ClusterState clusterState) {
request.setConcreteNodes(clusterState.nodes().getDataNodes().values().toArray(new DiscoveryNode[0]));
protected DiscoveryNode[] resolveRequest(TestNodesRequest request, ClusterState clusterState) {
return clusterState.nodes().getDataNodes().values().toArray(new DiscoveryNode[0]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,12 @@

public class TransportNodesInfoActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testNodesInfoActionWithRetentionOfDiscoveryNodesList() {
NodesInfoRequest request = new NodesInfoRequest();
request.setIncludeDiscoveryNodes(true);
Map<String, List<MockNodesInfoRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
* asserted in this test.
*/
public void testNodesInfoActionWithoutRetentionOfDiscoveryNodesList() {
NodesInfoRequest request = new NodesInfoRequest();
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockNodesInfoRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,11 @@
public class TransportNodesStatsActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testNodesStatsActionWithRetentionOfDiscoveryNodesList() {
NodesStatsRequest request = new NodesStatsRequest();
request.setIncludeDiscoveryNodes(true);
Map<String, List<MockNodeStatsRequest>> combinedSentRequest = performNodesStatsAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
* We don't want to send discovery nodes list to each request that is sent across from the coordinator node.
* This behavior is asserted in this test.
*/
public void testNodesStatsActionWithoutRetentionOfDiscoveryNodesList() {
NodesStatsRequest request = new NodesStatsRequest();
request.setIncludeDiscoveryNodes(false);
Map<String, List<MockNodeStatsRequest>> combinedSentRequest = performNodesStatsAction(request);

assertNotNull(combinedSentRequest);
Expand Down

0 comments on commit 96657b6

Please sign in to comment.