Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Weighted Routing] Weighted routing metadata to support multiple awareness attributes #5580

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.opensearch.cluster.routing;

import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -306,10 +308,12 @@ public void testDeleteWeightedRouting_WeightsNotSet() {

assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
ResourceNotFoundException exception = expectThrows(
ResourceNotFoundException.class,
() -> client().admin().cluster().prepareDeleteWeightedRouting().setAwarenessAttribute("zone").get()
);
assertEquals(exception.status(), RestStatus.NOT_FOUND);
assertTrue(exception.getMessage().contains("weighted routing metadata does not have weights set for awareness attribute zone"));
}

public void testDeleteWeightedRouting_WeightsAreSet() {
Expand Down Expand Up @@ -344,8 +348,120 @@ public void testDeleteWeightedRouting_WeightsAreSet() {
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin()
.cluster()
.prepareDeleteWeightedRouting()
.setAwarenessAttribute("zone")
.get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
}

public void testWeightedRoutingAPIs_WeightsSetForTwoAwarenessAttribute() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone, rack")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> starting 6 nodes on different zones");
int nodeCountPerAZ = 2;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRoutingForZone = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRoutingForZone)
.get();
assertEquals(response.isAcknowledged(), true);

// put call made on a data node in zone a
response = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1)))
.admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRoutingForZone)
.get();
assertEquals(response.isAcknowledged(), true);

// put call made with different awareness attribute

weights = Map.of("a", 1.0, "b", 0.0, "c", 3.0);
WeightedRouting weightedRoutingForRack = new WeightedRouting("rack", weights);
response = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1)))
.admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRoutingForRack)
.get();
assertEquals(response.isAcknowledged(), true);

// get api call with awareness attribute zone
ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.get();
assertEquals(weightedRoutingForZone, weightedRoutingResponse.weights());

// get api call with awareness attribute rack
weightedRoutingResponse = client().admin().cluster().prepareGetWeightedRouting().setAwarenessAttribute("rack").get();
assertEquals(weightedRoutingForRack, weightedRoutingResponse.weights());

// delete api call with awareness attribute zone

ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin()
.cluster()
.prepareDeleteWeightedRouting()
.setAwarenessAttribute("zone")
.get();
assertTrue(deleteResponse.isAcknowledged());

// delete api call with awareness attribute zone, 404
ResourceNotFoundException exception = expectThrows(
ResourceNotFoundException.class,
() -> client().admin().cluster().prepareDeleteWeightedRouting().setAwarenessAttribute("zone").get()
);
assertEquals(exception.status(), RestStatus.NOT_FOUND);
assertTrue(exception.getMessage().contains("weighted routing metadata does not have weights set for awareness attribute zone"));

// delete api call with awareness attribute rack
deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setAwarenessAttribute("rack").get();
assertTrue(deleteResponse.isAcknowledged());

// delete api call with awareness attribute rack, 404
exception = expectThrows(
ResourceNotFoundException.class,
() -> client().admin().cluster().prepareDeleteWeightedRouting().setAwarenessAttribute("rack").get()
);
assertEquals(exception.status(), RestStatus.NOT_FOUND);
assertTrue(
exception.getMessage().contains("weighted routing metadata does not have weights set for awareness" + " attribute rack")
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ public void testSearchWithWRRShardRouting() throws IOException {

logger.info("--> deleted shard routing weights for weighted round robin");

ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin()
.cluster()
.prepareDeleteWeightedRouting()
.setAwarenessAttribute("zone")
.get();
assertEquals(deleteResponse.isAcknowledged(), true);

hitNodes = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,52 @@

import java.io.IOException;

import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Request to delete weights for weighted round-robin shard routing policy.
*
* @opensearch.internal
*/
public class ClusterDeleteWeightedRoutingRequest extends ClusterManagerNodeRequest<ClusterDeleteWeightedRoutingRequest> {
private String awarenessAttribute;

public String getAwarenessAttribute() {
return awarenessAttribute;
}

public void setAwarenessAttribute(String awarenessAttribute) {
this.awarenessAttribute = awarenessAttribute;
}

public ClusterDeleteWeightedRoutingRequest(String awarenessAttribute) {
this.awarenessAttribute = awarenessAttribute;
}

public ClusterDeleteWeightedRoutingRequest() {}

public ClusterDeleteWeightedRoutingRequest(StreamInput in) throws IOException {
super(in);
awarenessAttribute = in.readString();
}

@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;
if (awarenessAttribute == null || awarenessAttribute.isEmpty()) {
validationException = addValidationError("Awareness attribute is missing", validationException);
}
return validationException;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(awarenessAttribute);
}

@Override
public String toString() {
return "ClusterDeleteWeightedRoutingRequest";
return "ClusterDeleteWeightedRoutingRequest { awarenessAttribute= " + awarenessAttribute + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ public class ClusterDeleteWeightedRoutingRequestBuilder extends ClusterManagerNo
public ClusterDeleteWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterDeleteWeightedRoutingAction action) {
super(client, action, new ClusterDeleteWeightedRoutingRequest());
}

public ClusterDeleteWeightedRoutingRequestBuilder setAwarenessAttribute(String attribute) {
request.setAwarenessAttribute(attribute);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,5 @@ public ClusterDeleteWeightedRoutingResponse(boolean acknowledged) {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*/
public class ClusterGetWeightedRoutingRequest extends ClusterManagerNodeReadRequest<ClusterGetWeightedRoutingRequest> {

String awarenessAttribute;
private String awarenessAttribute;

public String getAwarenessAttribute() {
return awarenessAttribute;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ protected void clusterManagerOperation(
WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().custom(WeightedRoutingMetadata.TYPE);
ClusterGetWeightedRoutingResponse clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse();
String weight = null;
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting() != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting(request.getAwarenessAttribute()) != null) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(request.getAwarenessAttribute());
if (request.local()) {
DiscoveryNode localNode = state.getNodes().getLocalNode();
if (localNode.getAttributes().get(request.getAwarenessAttribute()) != null) {
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/opensearch/client/Requests.java
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,8 @@ public static ClusterGetWeightedRoutingRequest getWeightedRoutingRequest(String
*
* @return delete weight request
*/
public static ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest() {
return new ClusterDeleteWeightedRoutingRequest();
public static ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest(String attributeName) {
return new ClusterDeleteWeightedRoutingRequest(attributeName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,14 +427,14 @@ private static void validateAwarenessAttribute(

private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState state, DecommissionAttribute decommissionAttribute) {
WeightedRoutingMetadata weightedRoutingMetadata = state.metadata().weightedRoutingMetadata();
if (weightedRoutingMetadata == null) {
if (weightedRoutingMetadata == null || weightedRoutingMetadata.getWeightedRouting(decommissionAttribute.attributeName()) == null) {
throw new DecommissioningFailedException(
decommissionAttribute,
"no weights are set to the attribute. Please set appropriate weights before triggering decommission action"
);
}
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting();
if (weightedRouting.attributeName().equals(decommissionAttribute.attributeName()) == false) {
WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(decommissionAttribute.attributeName());
if (weightedRouting == null) {
throw new DecommissioningFailedException(
decommissionAttribute,
"no weights are specified to attribute [" + decommissionAttribute.attributeName() + "]"
Expand Down
Loading