Skip to content

Commit

Permalink
Update Integ tests for Search WRR to include tests post removing WRR
Browse files Browse the repository at this point in the history
  • Loading branch information
Anshu Agarwal committed Sep 7, 2022
1 parent 8ce5255 commit c6ed81c
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.search;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.wrr.delete.ClusterDeleteWRRWeightsResponse;
import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -30,7 +31,7 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 2)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
public class SearchWRRIT extends OpenSearchIntegTestCase {
@Override
protected int numberOfReplicas() {
Expand All @@ -39,8 +40,7 @@ protected int numberOfReplicas() {

public void testSearchWithWRRShardRouting() throws IOException {
Settings commonSettings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" +
".values", "a,b,c")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

Expand All @@ -62,23 +62,20 @@ public void testSearchWithWRRShardRouting() throws IOException {
String C_1 = nodes.get(5);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health =
client().admin().cluster().prepareHealth().setWaitForNodes("6").execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("6").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

assertAcked(
prepareCreate("test").setSettings(
Settings.builder().put("index.number_of_shards", 10).put("index.number_of_replicas", 2)
)
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 10).put("index.number_of_replicas", 2))
);
ensureGreen();
logger.info("--> creating indices for test");
for (int i = 0; i < 100; i++) {
client().prepareIndex("test_"+i).setId("" + i).setSource("field_"+i, "value_"+i).get();
client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get();
}

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Object> weights = Map.of("a", "1", "b", "1", "c", "0");
Map<String, Object> weights = Map.of("a", "2", "b", "1", "c", "0");
WRRWeights wrrWeight = new WRRWeights("zone", weights);

ClusterPutWRRWeightsResponse response = client().admin().cluster().prepareWRRWeights().setWRRWeights(wrrWeight).get();
Expand All @@ -87,28 +84,50 @@ public void testSearchWithWRRShardRouting() throws IOException {
Set<String> hitNodes = new HashSet<>();
// making search requests
for (int i = 0; i < 50; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(A_0, A_1, B_0, B_1)).prepareSearch()
SearchResponse searchResponse = internalCluster().client(randomFrom(A_0, A_1, B_0, B_1))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j <searchResponse.getHits().getHits().length; j++) {
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
}
// search should not go to nodes in zone c
assertThat(hitNodes.size(), lessThanOrEqualTo( 4));
assertThat(hitNodes.size(), lessThanOrEqualTo(4));
DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes();
List<String> nodeIdsFromZoneWithWeightZero = new ArrayList<>();
for (DiscoveryNode node : dataNodes) {
if(node.getAttributes().get("zone").equals("c"))
{
if (node.getAttributes().get("zone").equals("c")) {
nodeIdsFromZoneWithWeightZero.add(node.getId());
}
}
for(String nodeId : nodeIdsFromZoneWithWeightZero) {
for (String nodeId : nodeIdsFromZoneWithWeightZero) {
assertFalse(hitNodes.contains(nodeId));
}

logger.info("--> deleted shard routing weights for weighted round robin");

ClusterDeleteWRRWeightsResponse deleteResponse = client().admin().cluster().prepareDeleteWRRWeights().get();
assertEquals(response.isAcknowledged(), true);

hitNodes = new HashSet<>();
// making search requests
for (int i = 0; i < 50; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(A_0, A_1, B_0, B_1))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
for (int j = 0; j < searchResponse.getHits().getHits().length; j++) {
hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId());
}
}

// Check shard routing requests hit data nodes in zone c
for (String nodeId : nodeIdsFromZoneWithWeightZero) {
assertFalse(!hitNodes.contains(nodeId));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
* @opensearch.internal
*/
public class ClusterDeleteWRRWeightsRequest extends ClusterManagerNodeRequest<ClusterDeleteWRRWeightsRequest> {
public ClusterDeleteWRRWeightsRequest() {
}
public ClusterDeleteWRRWeightsRequest() {}

public ClusterDeleteWRRWeightsRequest(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ protected void clusterManagerOperation(
String weight = null;
if (weightedRoundRobinMetadata != null && weightedRoundRobinMetadata.getWrrWeight() != null) {
WRRWeights wrrWeights = weightedRoundRobinMetadata.getWrrWeight();
if (request.local()) {
DiscoveryNode localNode = state.getNodes().getLocalNode();
if (localNode.getAttributes().get(request.getAwarenessAttribute())!=null) {
DiscoveryNode localNode = state.getNodes().getLocalNode();
if (request.local() && !localNode.isClusterManagerNode()) {
if (localNode.getAttributes().get(request.getAwarenessAttribute()) != null) {
String attrVal = localNode.getAttributes().get(request.getAwarenessAttribute());
if (wrrWeights.weights().containsKey(attrVal)) {
weight = wrrWeights.weights().get(attrVal).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ public interface ClusterAdminClient extends OpenSearchClient {
* sampled for the node ids specified in the request. Nodes usage of the
* cluster.
*
* @param request The nodes usage request
* @param request
* The nodes usage request
* @return The result future
* @see org.opensearch.client.Requests#nodesUsageRequest(String...)
*/
Expand Down

0 comments on commit c6ed81c

Please sign in to comment.