Skip to content

Commit

Permalink
Call WRR API's
Browse files Browse the repository at this point in the history
Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com>
  • Loading branch information
pranikum committed Sep 1, 2022
1 parent 662ef64 commit 032bd5e
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsAction;
import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsRequest;
import org.opensearch.action.admin.cluster.shards.routing.wrr.put.ClusterPutWRRWeightsResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand All @@ -33,7 +34,6 @@
import org.opensearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WRRWeights;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
Expand All @@ -46,6 +46,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -145,35 +146,51 @@ public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOExcepti

public void handleNodesDecommissionRequest(
Set<DiscoveryNode> nodesToBeDecommissioned,
List<String> zones,
String reason,
TimeValue timeout,
ActionListener<Void> nodesRemovedListener
) {
setWeightForDecommissionedZone();
setWeightForDecommissionedZone(zones);
checkHttpStatsForDecommissionedNodes(nodesToBeDecommissioned, reason, timeout, nodesRemovedListener);
}

private void setWeightForDecommissionedZone() {
private void setWeightForDecommissionedZone(List<String> zones) {
ClusterState clusterState = clusterService.getClusterApplierService().state();

final ClusterPutWRRWeightsRequest clusterWeightRequest = new ClusterPutWRRWeightsRequest(nodes);
DecommissionAttributeMetadata decommissionAttributeMetadata = clusterState.metadata().custom(DecommissionAttributeMetadata.TYPE);
assert decommissionAttributeMetadata.status().equals(DecommissionStatus.DECOMMISSION_INIT)
: "unexpected status encountered while decommissioning nodes";
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();

Map<String, String> weights = new HashMap<>();
zones.forEach(zone -> {
if (zone.equalsIgnoreCase(decommissionAttribute.attributeValue())) {
weights.put(zone, "0");
} else {
weights.put(zone, "1");
}
});

// WRR API will validate invalid weights
final ClusterPutWRRWeightsRequest clusterWeightRequest = new ClusterPutWRRWeightsRequest();
clusterWeightRequest.attributeName("zone");
WRRWeights wrrWeights = new WRRWeights("zone", Map.of())
nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequest.Metric.HTTP.metricName());
clusterWeightRequest.setWRRWeight(weights);

transportService.sendRequest(
transportService.getLocalNode(),
ClusterPutWRRWeightsAction.NAME,
nodesStatsRequest,
new TransportResponseHandler<NodesStatsResponse>() {
clusterWeightRequest,
new TransportResponseHandler<ClusterPutWRRWeightsResponse>() {
@Override
public void handleResponse(NodesStatsResponse response) {
listener.onResponse(response);
public void handleResponse(ClusterPutWRRWeightsResponse response) {
logger.info("Weights were set successfully set.");
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
logger.info("Exception occurred while setting weights.Exception Messages - ",
exp.unwrapCause().getMessage());
}

@Override
Expand All @@ -182,12 +199,11 @@ public String executor() {
}

@Override
public NodesStatsResponse read(StreamInput in) throws IOException {
return new NodesStatsResponse(in);
public ClusterPutWRRWeightsResponse read(StreamInput in) throws IOException {
return new ClusterPutWRRWeightsResponse(in);
}
});
}
}

void updateClusterStatusForDecommissioning(
Set<DiscoveryNode> nodesToBeDecommissioned,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS

private void initiateGracefulDecommission() {



decommissionController.updateMetadataWithDecommissionStatus(
DecommissionStatus.DECOMMISSION_IN_PROGRESS,
new ActionListener<Void>() {
Expand Down Expand Up @@ -300,9 +298,13 @@ private void failDecommissionedNodes(ClusterState state) {
: "unexpected status encountered while decommissioning nodes";
DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute();

// Awareness values refers to all zones in the cluster
List<String> awarenessValues = forcedAwarenessAttributes.get(decommissionAttribute.attributeName());

// execute nodes decommissioning
decommissionController.handleNodesDecommissionRequest(
nodesWithDecommissionAttribute(state, decommissionAttribute, false),
awarenessValues,
"nodes-decommissioned",
TimeValue.timeValueSeconds(30L), // TODO - read timeout from request while integrating with API
new ActionListener<Void>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchParseException;
import org.opensearch.Version;
import org.opensearch.cluster.AbstractNamedDiffable;
import org.opensearch.cluster.NamedDiff;
import org.opensearch.cluster.routing.WRRWeights;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;

/**
* Contains metadata for weighted round-robin shard routing weights
*
* @opensearch.internal
*/
public class WeightedRoundRobinMetadata extends AbstractNamedDiffable<Metadata.Custom> implements Metadata.Custom {
private static final Logger logger = LogManager.getLogger(WeightedRoundRobinMetadata.class);
public static final String TYPE = "wrr_shard_routing";
private WRRWeights wrrWeight;

public WRRWeights getWrrWeight() {
return wrrWeight;
}

public void setWrrWeight(WRRWeights wrrWeight) {
this.wrrWeight = wrrWeight;
}

public WeightedRoundRobinMetadata(StreamInput in) throws IOException {
this.wrrWeight = new WRRWeights(in);
}

public WeightedRoundRobinMetadata(WRRWeights wrrWeight) {
this.wrrWeight = wrrWeight;
}

@Override
public EnumSet<Metadata.XContentContext> context() {
return Metadata.API_AND_GATEWAY;
}

@Override
public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.V_2_3_0;

}

@Override
public void writeTo(StreamOutput out) throws IOException {
wrrWeight.writeTo(out);
}

public static NamedDiff<Metadata.Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(Metadata.Custom.class, TYPE, in);
}

public static WeightedRoundRobinMetadata fromXContent(XContentParser parser) throws IOException {
String attrKey = null;
Object attrValue;
String attributeName = null;
Map<String, Object> weights = new HashMap<>();
WRRWeights wrrWeight = null;
XContentParser.Token token;
// move to the first alias
parser.nextToken();
String awarenessKeyword = null;

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
// parses awareness object
awarenessKeyword = parser.currentName();
// awareness object contains object with awareness attribute name and its details
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new OpenSearchParseException("failed to parse wrr metadata [{}], expected object", awarenessKeyword);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
attributeName = parser.currentName();
// awareness attribute object contain wrr weight details
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new OpenSearchParseException("failed to parse wrr metadata [{}], expected object", attributeName);
}
// parse weights, corresponding attribute key and puts it in a map
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
attrKey = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
attrValue = parser.text();
weights.put(attrKey, attrValue);
} else {
throw new OpenSearchParseException("failed to parse wrr metadata attribute [{}], unknown type", attributeName);
}
}
}
} else {
throw new OpenSearchParseException("failed to parse wrr metadata attribute [{}]", attributeName);
}
}
wrrWeight = new WRRWeights(attributeName, weights);
return new WeightedRoundRobinMetadata(wrrWeight);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WeightedRoundRobinMetadata that = (WeightedRoundRobinMetadata) o;
return wrrWeight.equals(that.wrrWeight);
}

@Override
public int hashCode() {
return wrrWeight.hashCode();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
toXContent(wrrWeight, builder);
return builder;
}

public static void toXContent(WRRWeights wrrWeight, XContentBuilder builder) throws IOException {
builder.startObject("awareness");
builder.startObject(wrrWeight.attributeName());
for (Map.Entry<String, Object> entry : wrrWeight.weights().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
builder.endObject();
builder.endObject();
}

@Override
public String toString() {
return Strings.toString(this);
}
}

0 comments on commit 032bd5e

Please sign in to comment.