Skip to content

Commit

Permalink
Fix for deserilization bug in weighted round robin metadata (opensear…
Browse files Browse the repository at this point in the history
…ch-project#11679)

* Fix for deserialization bug in weighted round robin metadata

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>

* Add changelog

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>

* Add null check

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>

* Add integ test

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>

* spotless fix

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>

---------

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
Co-authored-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
anshu1106 and Anshu Agarwal committed Feb 12, 2024
1 parent 4dc5f49 commit cf1a0a2
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Removed

### Fixed
- Fix for deserilization bug in weighted round-robin metadata ([#11679](https://github.com/opensearch-project/OpenSearch/pull/11679))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -715,4 +717,144 @@ public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Excep
assertFalse(nodeLocalHealth.isTimedOut());
assertTrue(nodeLocalHealth.hasDiscoveredClusterManager());
}

public void testReadWriteWeightedRoutingMetadataOnNodeRestart() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());

// check weighted routing metadata after node restart, ensure node comes healthy after restart
internalCluster().restartNode(nodes_in_zone_a.get(0), new InternalTestCluster.RestartCallback());
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// make sure restarted node joins the cluster
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
assertNotNull(
internalCluster().client(nodes_in_zone_a.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_b.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_c.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(internalCluster().getClusterManagerName())
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);

internalCluster().restartNode(internalCluster().getClusterManagerName(), new InternalTestCluster.RestartCallback());
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// make sure restarted node joins the cluster
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
assertNotNull(
internalCluster().client(nodes_in_zone_a.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_b.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_c.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(internalCluster().getClusterManagerName())
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Contains metadata for weighted routing
Expand Down Expand Up @@ -99,7 +100,7 @@ public static NamedDiff<Metadata.Custom> readDiffFrom(StreamInput in) throws IOE
public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws IOException {
String attrKey = null;
Double attrValue;
String attributeName = null;
String attributeName = "";
Map<String, Double> weights = new HashMap<>();
WeightedRouting weightedRouting;
XContentParser.Token token;
Expand Down Expand Up @@ -162,12 +163,12 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WeightedRoutingMetadata that = (WeightedRoutingMetadata) o;
return weightedRouting.equals(that.weightedRouting);
return weightedRouting.equals(that.weightedRouting) && version == that.version;
}

@Override
public int hashCode() {
return weightedRouting.hashCode();
return Objects.hash(weightedRouting.hashCode(), version);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public boolean isSet() {

@Override
public void writeTo(StreamOutput out) throws IOException {

out.writeString(attributeName);
out.writeGenericValue(weights);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,60 @@

package org.opensearch.cluster.metadata;

import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.WeightedRouting;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.test.AbstractXContentTestCase;
import org.opensearch.test.AbstractDiffableSerializationTestCase;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class WeightedRoutingMetadataTests extends AbstractXContentTestCase<WeightedRoutingMetadata> {
public class WeightedRoutingMetadataTests extends AbstractDiffableSerializationTestCase<Metadata.Custom> {

@Override
protected Writeable.Reader<Metadata.Custom> instanceReader() {
return WeightedRoutingMetadata::new;
}

@Override
protected WeightedRoutingMetadata createTestInstance() {
String attributeName = "zone";
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
if (randomBoolean()) {
weights = new HashMap<>();
attributeName = "";
}
WeightedRouting weightedRouting = new WeightedRouting(attributeName, weights);
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1);

return weightedRoutingMetadata;
}

@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
}

@Override
protected WeightedRoutingMetadata doParseInstance(XContentParser parser) throws IOException {
return WeightedRoutingMetadata.fromXContent(parser);
}

@Override
protected boolean supportsUnknownFields() {
return false;
protected Metadata.Custom makeTestChanges(Metadata.Custom testInstance) {

WeightedRouting weightedRouting = new WeightedRouting("", new HashMap<>());
WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1);
return weightedRoutingMetadata;
}

@Override
protected Writeable.Reader<Diff<Metadata.Custom>> diffReader() {
return WeightedRoutingMetadata::readDiffFrom;
}

}

0 comments on commit cf1a0a2

Please sign in to comment.