Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
Anshu Agarwal committed Jan 4, 2023
1 parent 311363a commit 1d904be
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 21 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Add support for discovered master and remove local weights ([#5680](https://github.com/opensearch-project/OpenSearch/pull/5680))
- Add support for discovered cluster manager and remove local weights ([#5680](https://github.
com/opensearch-project/OpenSearch/pull/5680))


### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,32 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
public class WeightedRoutingIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class, MockRepository.Plugin.class);
}

public void testPutWeightedRouting() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
Expand Down Expand Up @@ -157,6 +172,7 @@ public void testGetWeightedRouting_WeightsNotSet() {
.setAwarenessAttribute("zone")
.get();
assertNull(weightedRoutingResponse.weights());
assertNull(weightedRoutingResponse.getDiscoveredClusterManager());
}

public void testGetWeightedRouting_WeightsAreSet() throws IOException {
Expand Down Expand Up @@ -209,6 +225,7 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setAwarenessAttribute("zone")
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

// get api to fetch local node weight for a node in zone a
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1)))
Expand All @@ -219,6 +236,7 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

// get api to fetch local node weight for a node in zone b
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_b.get(0), nodes_in_zone_b.get(1)))
Expand All @@ -229,6 +247,7 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

// get api to fetch local node weight for a node in zone c
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_c.get(0), nodes_in_zone_c.get(1)))
Expand All @@ -239,6 +258,81 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertTrue(weightedRoutingResponse.getDiscoveredClusterManager());

}

public void testGetWeightedRouting_ClusterManagerNotDiscovered() throws Exception {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.put("cluster.fault_detection.leader_check.timeout", 10000 + "ms")
.put("cluster.fault_detection.leader_check.retry_count", 1)
.build();

int nodeCountPerAZ = 1;

logger.info("--> starting a dedicated cluster manager node");
String clusterManager = internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 2 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
// put api call to set weights
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

Set<String> nodesInOneSide = Stream.of(nodes_in_zone_a.get(0), nodes_in_zone_b.get(0), nodes_in_zone_c.get(0))
.collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(clusterManager).collect(Collectors.toCollection(HashSet::new));

NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);

logger.info("--> network disruption is started");
networkDisruption.startDisrupting();

// wait for leader checker to fail
Thread.sleep(13000);

// get api to fetch local node weight for a node in zone a
ClusterGetWeightedRoutingResponse weightedRoutingResponse = internalCluster().client(
randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_b.get(0))
).admin().cluster().prepareGetWeightedRouting().setAwarenessAttribute("zone").setRequestLocal(true).get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertFalse(weightedRoutingResponse.getDiscoveredClusterManager());

logger.info("--> network disruption is stopped");
networkDisruption.stopDisrupting();

}

public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,31 @@ public WeightedRouting getWeightedRouting() {

private final WeightedRouting weightedRouting;

public Boolean getDiscoveredMaster() {
return discoveredMaster;
public Boolean getDiscoveredClusterManager() {
return discoveredClusterManager;
}

private final Boolean discoveredMaster;
private final Boolean discoveredClusterManager;

private static final String DISCOVERED_MASTER = "discovered_master";
private static final String DISCOVERED_CLUSTER_MANAGER = "discovered_cluster_manager";

ClusterGetWeightedRoutingResponse() {
this.weightedRouting = null;
this.discoveredMaster = null;
this.discoveredClusterManager = null;
}

public ClusterGetWeightedRoutingResponse(WeightedRouting weightedRouting, Boolean discoveredMaster) {
this.discoveredMaster = discoveredMaster;
public ClusterGetWeightedRoutingResponse(WeightedRouting weightedRouting, Boolean discoveredClusterManager) {
this.discoveredClusterManager = discoveredClusterManager;
this.weightedRouting = weightedRouting;
}

ClusterGetWeightedRoutingResponse(StreamInput in) throws IOException {
if (in.available() != 0) {
this.weightedRouting = new WeightedRouting(in);
this.discoveredMaster = in.readOptionalBoolean();
this.discoveredClusterManager = in.readOptionalBoolean();
} else {
this.weightedRouting = null;
this.discoveredMaster = null;
this.discoveredClusterManager = null;
}
}

Expand All @@ -79,8 +79,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (weightedRouting != null) {
weightedRouting.writeTo(out);
}
if (discoveredMaster != null) {
out.writeOptionalBoolean(discoveredMaster);
if (discoveredClusterManager != null) {
out.writeOptionalBoolean(discoveredClusterManager);
}
}

Expand All @@ -91,8 +91,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
for (Map.Entry<String, Double> entry : weightedRouting.weights().entrySet()) {
builder.field(entry.getKey(), entry.getValue().toString());
}
if (discoveredMaster != null) {
builder.field(DISCOVERED_MASTER, discoveredMaster);
if (discoveredClusterManager != null) {
builder.field(DISCOVERED_CLUSTER_MANAGER, discoveredClusterManager);
}
}
builder.endObject();
Expand All @@ -103,7 +103,7 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
XContentParser.Token token;
String attrKey = null, attrValue = null;
Boolean discoveredMaster = null;
Boolean discoveredClusterManager = null;
Map<String, Double> weights = new HashMap<>();

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
Expand All @@ -114,26 +114,26 @@ public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser pars
if (attrKey != null) {
weights.put(attrKey, Double.parseDouble(attrValue));
}
} else if (token == XContentParser.Token.VALUE_BOOLEAN && attrKey != null && attrKey.equals(DISCOVERED_MASTER)) {
discoveredMaster = Boolean.parseBoolean(parser.text());
} else if (token == XContentParser.Token.VALUE_BOOLEAN && attrKey != null && attrKey.equals(DISCOVERED_CLUSTER_MANAGER)) {
discoveredClusterManager = Boolean.parseBoolean(parser.text());
} else {
throw new OpenSearchParseException("failed to parse weighted routing response");
}
}
WeightedRouting weightedRouting = new WeightedRouting("", weights);
return new ClusterGetWeightedRoutingResponse(weightedRouting, discoveredMaster);
return new ClusterGetWeightedRoutingResponse(weightedRouting, discoveredClusterManager);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterGetWeightedRoutingResponse that = (ClusterGetWeightedRoutingResponse) o;
return weightedRouting.equals(that.weightedRouting) && discoveredMaster.equals(that.discoveredMaster);
return weightedRouting.equals(that.weightedRouting) && discoveredClusterManager.equals(that.discoveredClusterManager);
}

@Override
public int hashCode() {
return Objects.hash(weightedRouting, discoveredMaster);
return Objects.hash(weightedRouting, discoveredClusterManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void testGetWeightedRoutingLocalWeight_WeightsSetInMetadata() {
ClusterServiceUtils.setState(clusterService, builder);

ClusterGetWeightedRoutingResponse response = ActionTestUtils.executeBlocking(transportGetWeightedRoutingAction, request.request());
assertEquals(true, response.getDiscoveredMaster());
assertEquals(true, response.getDiscoveredClusterManager());
assertEquals(weights, response.getWeightedRouting().weights());
}

Expand Down

0 comments on commit 1d904be

Please sign in to comment.