Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Weighted Routing] Add support for discovered master and remove local weights in the response #5680

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Add support for discovered master and remove local weights ([#5680](https://github.com/opensearch-project/OpenSearch/pull/5680))


### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("1.0", weightedRoutingResponse.getLocalNodeWeight());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add discovered_cluster_manager assertion as well .


// get api to fetch local node weight for a node in zone b
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_b.get(0), nodes_in_zone_b.get(1)))
Expand All @@ -230,7 +229,6 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("2.0", weightedRoutingResponse.getLocalNodeWeight());

// get api to fetch local node weight for a node in zone c
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_c.get(0), nodes_in_zone_c.get(1)))
Expand All @@ -241,7 +239,6 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("3.0", weightedRoutingResponse.getLocalNodeWeight());
}

public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,37 @@
* @opensearch.internal
*/
public class ClusterGetWeightedRoutingResponse extends ActionResponse implements ToXContentObject {
private WeightedRouting weightedRouting;
private String localNodeWeight;
private static final String NODE_WEIGHT = "node_weight";
public WeightedRouting getWeightedRouting() {
return weightedRouting;
}

private final WeightedRouting weightedRouting;

public String getLocalNodeWeight() {
return localNodeWeight;
public Boolean getDiscoveredMaster() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets use cluster_manager everywhere . master is going to get deprecated in 3.0.

return discoveredMaster;
}

private final Boolean discoveredMaster;

private static final String DISCOVERED_MASTER = "discovered_master";

ClusterGetWeightedRoutingResponse() {
this.weightedRouting = null;
this.discoveredMaster = null;
}

public ClusterGetWeightedRoutingResponse(String localNodeWeight, WeightedRouting weightedRouting) {
this.localNodeWeight = localNodeWeight;
public ClusterGetWeightedRoutingResponse(WeightedRouting weightedRouting, Boolean discoveredMaster) {
this.discoveredMaster = discoveredMaster;
this.weightedRouting = weightedRouting;
}

ClusterGetWeightedRoutingResponse(StreamInput in) throws IOException {
if (in.available() != 0) {
this.weightedRouting = new WeightedRouting(in);
this.discoveredMaster = in.readOptionalBoolean();
} else {
this.weightedRouting = null;
this.discoveredMaster = null;
}
}

Expand All @@ -68,6 +79,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (weightedRouting != null) {
weightedRouting.writeTo(out);
}
if (discoveredMaster != null) {
out.writeOptionalBoolean(discoveredMaster);
}
}

@Override
Expand All @@ -77,8 +91,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
for (Map.Entry<String, Double> entry : weightedRouting.weights().entrySet()) {
builder.field(entry.getKey(), entry.getValue().toString());
}
if (localNodeWeight != null) {
builder.field(NODE_WEIGHT, localNodeWeight);
if (discoveredMaster != null) {
builder.field(DISCOVERED_MASTER, discoveredMaster);
}
}
builder.endObject();
Expand All @@ -89,37 +103,37 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
XContentParser.Token token;
String attrKey = null, attrValue = null;
String localNodeWeight = null;
Boolean discoveredMaster = null;
Map<String, Double> weights = new HashMap<>();

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
attrKey = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
attrValue = parser.text();
if (attrKey != null && attrKey.equals(NODE_WEIGHT)) {
localNodeWeight = attrValue;
} else if (attrKey != null) {
if (attrKey != null) {
weights.put(attrKey, Double.parseDouble(attrValue));
}
} else if (token == XContentParser.Token.VALUE_BOOLEAN && attrKey != null && attrKey.equals(DISCOVERED_MASTER)) {
discoveredMaster = Boolean.parseBoolean(parser.text());
} else {
throw new OpenSearchParseException("failed to parse weighted routing response");
}
}
WeightedRouting weightedRouting = new WeightedRouting("", weights);
return new ClusterGetWeightedRoutingResponse(localNodeWeight, weightedRouting);
return new ClusterGetWeightedRoutingResponse(weightedRouting, discoveredMaster);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterGetWeightedRoutingResponse that = (ClusterGetWeightedRoutingResponse) o;
return weightedRouting.equals(that.weightedRouting) && localNodeWeight.equals(that.localNodeWeight);
return weightedRouting.equals(that.weightedRouting) && discoveredMaster.equals(that.discoveredMaster);
}

@Override
public int hashCode() {
return Objects.hash(weightedRouting, localNodeWeight);
return Objects.hash(weightedRouting, discoveredMaster);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;

import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.cluster.routing.WeightedRoutingService;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -89,19 +88,12 @@ protected void clusterManagerOperation(
weightedRoutingService.verifyAwarenessAttribute(request.getAwarenessAttribute());
WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().custom(WeightedRoutingMetadata.TYPE);
ClusterGetWeightedRoutingResponse clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse();
String weight = null;
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting() != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (request.local()) {
DiscoveryNode localNode = state.getNodes().getLocalNode();
if (localNode.getAttributes().get(request.getAwarenessAttribute()) != null) {
String attrVal = localNode.getAttributes().get(request.getAwarenessAttribute());
if (weightedRouting.weights().containsKey(attrVal)) {
weight = weightedRouting.weights().get(attrVal).toString();
}
}
}
clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(weight, weightedRouting);
clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse(
weightedRouting,
state.nodes().getClusterManagerNodeId() != null
);
}
listener.onResponse(clusterGetWeightedRoutingResponse);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class ClusterGetWeightedRoutingResponseTests extends AbstractXContentTest
protected ClusterGetWeightedRoutingResponse createTestInstance() {
Map<String, Double> weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("", weights);
ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse("1", weightedRouting);
ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse(weightedRouting, true);
return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ public void testGetWeightedRouting_WeightsNotSetInMetadata() {
ClusterState state = clusterService.state();

ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request());
assertEquals(response.getLocalNodeWeight(), null);
assertEquals(response.weights(), null);
}

Expand Down Expand Up @@ -231,7 +230,8 @@ public void testGetWeightedRoutingLocalWeight_WeightsSetInMetadata() {
ClusterServiceUtils.setState(clusterService, builder);

ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request());
assertEquals("0.0", response.getLocalNodeWeight());
assertEquals(true, response.getDiscoveredMaster());
assertEquals(weights, response.getWeightedRouting().weights());
}

public void testGetWeightedRoutingLocalWeight_WeightsNotSetInMetadata() {
Expand All @@ -250,7 +250,7 @@ public void testGetWeightedRoutingLocalWeight_WeightsNotSetInMetadata() {
ClusterServiceUtils.setState(clusterService, builder);

ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request());
assertEquals(null, response.getLocalNodeWeight());
assertEquals(null, response.getWeightedRouting());
}

@After
Expand Down