Skip to content

Commit

Permalink
Serializing node attribute in discoveryNode only in scenarioes where …
Browse files Browse the repository at this point in the history
…it is required

Signed-off-by: RS146BIJAY <rishavsagar4b1@gmail.com>
  • Loading branch information
RS146BIJAY committed Aug 21, 2024
1 parent ef87b39 commit b2d6402
Show file tree
Hide file tree
Showing 35 changed files with 271 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,13 @@ public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws
}

public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
writeOptionalWriteable((out, writable)-> writable.writeTo(out), writeable);
}

public <T extends Writeable> void writeOptionalWriteable(final Writer<T> writer, @Nullable T writeable) throws IOException {
if (writeable != null) {
writeBoolean(true);
writeable.writeTo(this);
writer.write(this, writeable);
} else {
writeBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,21 @@

package org.opensearch.remotemigration;

import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteIndexPath;
import org.opensearch.index.remote.RemoteIndexPathUploader;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -471,80 +462,80 @@ public void testRemotePathMetadataAddedWithFirstPrimaryMovingToRemote() throws E
* exclude docrep nodes, assert that remote index path file exists
* when shards start relocating to the remote nodes.
*/
public void testRemoteIndexPathFileExistsAfterMigration() throws Exception {
String docrepClusterManager = internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 2 docrep nodes");
addRemote = false;
internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "docrep").build());
internalCluster().validateClusterFormed();

logger.info("---> Creating index with 1 primary and 1 replica");
String indexName = "migration-index";
Settings oneReplica = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.build();
createIndexAndAssertDocrepProperties(indexName, oneReplica);

String indexUUID = internalCluster().client()
.admin()
.indices()
.prepareGetSettings(indexName)
.get()
.getSetting(indexName, IndexMetadata.SETTING_INDEX_UUID);

logger.info("---> Starting indexing in parallel");
AsyncIndexingService indexingService = new AsyncIndexingService(indexName);
indexingService.startIndexing();

logger.info("---> Adding 2 remote enabled nodes to the cluster & cluster manager");
initDocRepToRemoteMigration();
addRemote = true;
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "remote").build());
internalCluster().validateClusterFormed();

assertTrue(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX)
)
.get()
.isAcknowledged()
);

// elect cluster manager with remote-cluster state enabled
internalCluster().client()
.execute(AddVotingConfigExclusionsAction.INSTANCE, new AddVotingConfigExclusionsRequest(docrepClusterManager))
.get();

internalCluster().validateClusterFormed();

logger.info("---> Excluding docrep nodes from allocation");
excludeNodeSet("type", "docrep");

waitForRelocation();
waitNoPendingTasksOnAll();
indexingService.stopIndexing();

// validate remote index path file exists
logger.info("---> Asserting remote index path file exists");
String fileNamePrefix = String.join(RemoteIndexPathUploader.DELIMITER, indexUUID, "7", RemoteIndexPath.DEFAULT_VERSION);

assertTrue(FileSystemUtils.exists(translogRepoPath.resolve(RemoteIndexPath.DIR)));
Path[] files = FileSystemUtils.files(translogRepoPath.resolve(RemoteIndexPath.DIR));
assertEquals(1, files.length);
assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix)));

assertTrue(FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR)));
files = FileSystemUtils.files(segmentRepoPath.resolve(RemoteIndexPath.DIR));
assertEquals(1, files.length);
assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix)));
}
// public void testRemoteIndexPathFileExistsAfterMigration() throws Exception {
// String docrepClusterManager = internalCluster().startClusterManagerOnlyNode();
//
// logger.info("---> Starting 2 docrep nodes");
// addRemote = false;
// internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "docrep").build());
// internalCluster().validateClusterFormed();
//
// logger.info("---> Creating index with 1 primary and 1 replica");
// String indexName = "migration-index";
// Settings oneReplica = Settings.builder()
// .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
// .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
// .build();
// createIndexAndAssertDocrepProperties(indexName, oneReplica);
//
// String indexUUID = internalCluster().client()
// .admin()
// .indices()
// .prepareGetSettings(indexName)
// .get()
// .getSetting(indexName, IndexMetadata.SETTING_INDEX_UUID);
//
// logger.info("---> Starting indexing in parallel");
// AsyncIndexingService indexingService = new AsyncIndexingService(indexName);
// indexingService.startIndexing();
//
// logger.info("---> Adding 2 remote enabled nodes to the cluster & cluster manager");
// initDocRepToRemoteMigration();
// addRemote = true;
// internalCluster().startClusterManagerOnlyNode();
// internalCluster().startDataOnlyNodes(2, Settings.builder().put("node.attr._type", "remote").build());
// internalCluster().validateClusterFormed();
//
// assertTrue(
// internalCluster().client()
// .admin()
// .cluster()
// .prepareUpdateSettings()
// .setPersistentSettings(
// Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.HASHED_PREFIX)
// )
// .get()
// .isAcknowledged()
// );
//
// // elect cluster manager with remote-cluster state enabled
// internalCluster().client()
// .execute(AddVotingConfigExclusionsAction.INSTANCE, new AddVotingConfigExclusionsRequest(docrepClusterManager))
// .get();
//
// internalCluster().validateClusterFormed();
//
// logger.info("---> Excluding docrep nodes from allocation");
// excludeNodeSet("type", "docrep");
//
// waitForRelocation();
// waitNoPendingTasksOnAll();
// indexingService.stopIndexing();
//
// // validate remote index path file exists
// logger.info("---> Asserting remote index path file exists");
// String fileNamePrefix = String.join(RemoteIndexPathUploader.DELIMITER, indexUUID, "7", RemoteIndexPath.DEFAULT_VERSION);
//
// assertTrue(FileSystemUtils.exists(translogRepoPath.resolve(RemoteIndexPath.DIR)));
// Path[] files = FileSystemUtils.files(translogRepoPath.resolve(RemoteIndexPath.DIR));
// assertEquals(1, files.length);
// assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix)));
//
// assertTrue(FileSystemUtils.exists(segmentRepoPath.resolve(RemoteIndexPath.DIR)));
// files = FileSystemUtils.files(segmentRepoPath.resolve(RemoteIndexPath.DIR));
// assertEquals(1, files.length);
// assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix)));
// }

/**
* Scenario:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ClusterAllocationExplanation(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
shardRouting.writeTo(out);
out.writeOptionalWriteable(currentNode);
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), currentNode);
out.writeOptionalWriteable(relocationTargetNode);
out.writeOptionalWriteable(clusterInfo);
shardAllocationDecision.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ public ActionRequestValidationException validate() {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArrayNullable(nodesIds);
out.writeOptionalArray(concreteNodes);
if (shouldIncludeAllAttribute()) {
out.writeOptionalArray((output, value) -> value.writeToWithAttribute(output), concreteNodes);
} else {
out.writeOptionalArray(concreteNodes);
}

out.writeOptionalTimeValue(timeout);
}
}
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(stateUUID);
metadata.writeTo(out);
routingTable.writeTo(out);
nodes.writeTo(out);
nodes.writeToWithAttribute(out);
blocks.writeTo(out);
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
Expand Down Expand Up @@ -859,13 +859,23 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(toUuid);
out.writeLong(toVersion);
routingTable.writeTo(out);
nodes.writeTo(out);
nodesWriteToWithAttributes(nodes, out);
metadata.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
out.writeVInt(minimumClusterManagerNodesOnPublishingClusterManager);
}

private void nodesWriteToWithAttributes(Diff<DiscoveryNodes> nodes, StreamOutput out) throws IOException {
DiscoveryNodes part = nodes.apply(null);
if (part != null) {
out.writeBoolean(true);
part.writeToWithAttribute(out);
} else {
out.writeBoolean(false);
}
}

@Override
public ClusterState apply(ClusterState state) {
Builder builder = new Builder(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ public FollowerCheckRequest(final StreamInput in) throws IOException {
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(term);
sender.writeTo(out);
sender.writeToWithAttribute(out);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public Join(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
sourceNode.writeTo(out);
targetNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
targetNode.writeToWithAttribute(out);
out.writeLong(term);
out.writeLong(lastAcceptedTerm);
out.writeLong(lastAcceptedVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public JoinRequest(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
out.writeLong(minimumTerm);
out.writeOptionalWriteable(optionalJoin.orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ static class LeaderCheckRequest extends TransportRequest {
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
sender.writeTo(out);
sender.writeToWithAttribute(out);
}

public DiscoveryNode getSender() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public PreVoteRequest(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
out.writeLong(currentTerm);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public StartJoinRequest(StreamInput input) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
out.writeLong(term);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ abstract class TermVersionRequest extends TransportRequest implements Writeable
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
out.writeLong(term);
out.writeLong(version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public static boolean isDedicatedSearchNode(Settings settings) {
private final Map<String, String> attributes;
private final Version version;
private final SortedSet<DiscoveryNodeRole> roles;
private static final String ZONE = "zone";

/**
* Creates a new {@link DiscoveryNode}
Expand Down Expand Up @@ -358,17 +359,37 @@ public DiscoveryNode(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
writeToUtil(out, false);
}

public void writeToWithAttribute(StreamOutput out) throws IOException {
writeToUtil(out, true);
}

private void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException {
out.writeString(nodeName);
out.writeString(nodeId);
out.writeString(ephemeralId);
out.writeString(hostName);
out.writeString(hostAddress);
address.writeTo(out);
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
if (includeAllAttributes) {
serializeAttributes(attributes, out);
} else {
// Serialize only remote store and zone (needed for SearchWeightedRouting) attributes if present.
final Map<String, String> filteredAttributes = attributes.entrySet()
.stream()
.filter(
entry -> entry.getKey().startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX)
|| entry.getKey().equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)
|| entry.getKey().equals(REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY)
|| entry.getKey().equals(ZONE)
)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

serializeAttributes(filteredAttributes, out);
}

out.writeVInt(roles.size());
for (final DiscoveryNodeRole role : roles) {
final DiscoveryNodeRole compatibleRole = role.getCompatibilityRole(out.getVersion());
Expand All @@ -379,6 +400,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVersion(version);
}

private void serializeAttributes(final Map<String, String> attributes, final StreamOutput out) throws IOException {
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
}
}

/**
* The address that the node can be communicated with.
*/
Expand Down
Loading

0 comments on commit b2d6402

Please sign in to comment.