Skip to content

Commit

Permalink
Prefer remote replicas for primary promotion during migration (#13136) (
Browse files Browse the repository at this point in the history
#13377)

Signed-off-by: Lakshya Taragi <lakshya.taragi@gmail.com>
(cherry picked from commit efa06fe)
  • Loading branch information
ltaragi committed Apr 25, 2024
1 parent 443dd81 commit 64909a3
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,141 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
);
}

/*
Scenario:
- Starts 1 docrep backed data node
- Creates an index with 0 replica
- Starts 1 remote backed data node
- Moves primary copy from docrep to remote through _cluster/reroute
- Starts 1 more remote backed data node
- Expands index to 2 replicas, one each on new remote node and docrep node
- Stops remote enabled node hosting the primary
- Ensures remote replica gets promoted to primary
- Ensures doc count is same after failover
- Indexes some more docs to ensure working of failed-over primary
*/
public void testFailoverRemotePrimaryToRemoteReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 1 docrep data node");
String docrepNodeName = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0);

logger.info("---> Creating index with 0 replica");
createIndex(FAILOVER_REMOTE_TO_REMOTE, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
initDocRepToRemoteMigration();

logger.info("---> Starting 1 remote enabled data node");
addRemote = true;
String remoteNodeName1 = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
assertEquals(
internalCluster().client()
.admin()
.cluster()
.prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME)
.get()
.repositories()
.size(),
2
);

logger.info("---> Starting doc ingestion in parallel thread");
AsyncIndexingService asyncIndexingService = new AsyncIndexingService(FAILOVER_REMOTE_TO_REMOTE);
asyncIndexingService.startIndexing();

logger.info("---> Moving primary copy from docrep node {} to remote enabled node {}", docrepNodeName, remoteNodeName1);
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, docrepNodeName, remoteNodeName1))
.get()
);
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
assertEquals(primaryNodeName(FAILOVER_REMOTE_TO_REMOTE), remoteNodeName1);

logger.info("---> Starting 1 more remote enabled data node");
String remoteNodeName2 = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();

logger.info("---> Expanding index to 2 replica copies, on docrepNode and remoteNode2");
assertAcked(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings()
.setIndices(FAILOVER_REMOTE_TO_REMOTE)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build())
.get()
);
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);

logger.info("---> Stopping indexing thread");
asyncIndexingService.stopIndexing();

refreshAndWaitForReplication(FAILOVER_REMOTE_TO_REMOTE);
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client()
.admin()
.indices()
.prepareStats(FAILOVER_REMOTE_TO_REMOTE)
.setDocs(true)
.get()
.asMap();
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
long initialPrimaryDocCount = 0;
for (ShardRouting shardRouting : shardStatsMap.keySet()) {
if (shardRouting.primary()) {
assertTrue(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode());
initialPrimaryDocCount = shardStatsMap.get(shardRouting).getStats().getDocs().getCount();
}
}
int firstBatch = (int) asyncIndexingService.getIndexedDocs();
assertReplicaAndPrimaryConsistency(FAILOVER_REMOTE_TO_REMOTE, firstBatch, 0);

logger.info("---> Stop remote store enabled node hosting the primary");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName1));
ensureStableCluster(3);
ensureYellow(FAILOVER_REMOTE_TO_REMOTE);
DiscoveryNodes finalNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();

waitUntil(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
String nodeId = clusterState.getRoutingTable().index(FAILOVER_REMOTE_TO_REMOTE).shard(0).primaryShard().currentNodeId();
if (nodeId == null) {
return false;
} else {
assertEquals(finalNodes.get(nodeId).getName(), remoteNodeName2);
return finalNodes.get(nodeId).isRemoteStoreNode();
}
});

shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_REMOTE).setDocs(true).get().asMap();
long primaryDocCountAfterFailover = 0;
for (ShardRouting shardRouting : shardStatsMap.keySet()) {
if (shardRouting.primary()) {
assertTrue(finalNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode());
primaryDocCountAfterFailover = shardStatsMap.get(shardRouting).getStats().getDocs().getCount();
}
}
assertEquals(initialPrimaryDocCount, primaryDocCountAfterFailover);

logger.info("---> Index some more docs to ensure that the failed over primary is ingesting new docs");
int secondBatch = randomIntBetween(1, 10);
logger.info("---> Indexing {} more docs", secondBatch);
indexBulk(FAILOVER_REMOTE_TO_REMOTE, secondBatch);
refreshAndWaitForReplication(FAILOVER_REMOTE_TO_REMOTE);

shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_REMOTE).setDocs(true).get().asMap();
assertEquals(2, shardStatsMap.size());
shardStatsMap.forEach(
(shardRouting, shardStats) -> { assertEquals(firstBatch + secondBatch, shardStats.getStats().getDocs().getCount()); }
);
}

/*
Scenario:
- Starts 1 docrep backed data node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.isMigratingToRemoteStore;

/**
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
* It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing
Expand Down Expand Up @@ -418,6 +420,20 @@ public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) {
.orElse(null);
}

/**
* Returns one active replica shard on a remote node for the given shard id or <code>null</code> if
* no such replica is found.
* <p>
* Since we aim to continue moving forward during remote store migration, replicas already migrated to remote nodes
* are preferred for primary promotion
*/
public ShardRouting activeReplicaOnRemoteNode(ShardId shardId) {
return assignedShards(shardId).stream().filter(shr -> !shr.primary() && shr.active()).filter((shr) -> {
RoutingNode nd = node(shr.currentNodeId());
return (nd != null && nd.node().isRemoteStoreNode());
}).findFirst().orElse(null);
}

/**
* Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code>
*/
Expand Down Expand Up @@ -735,11 +751,17 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists(
RoutingChangesObserver routingChangesObserver
) {
assert failedShard.primary();
ShardRouting activeReplica;
if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) {
activeReplica = activeReplicaWithOldestVersion(failedShard.shardId());
} else {
activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
ShardRouting activeReplica = null;
if (isMigratingToRemoteStore(metadata)) {
// we might not find any replica on remote node
activeReplica = activeReplicaOnRemoteNode(failedShard.shardId());
}
if (activeReplica == null) {
if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) {
activeReplica = activeReplicaWithOldestVersion(failedShard.shardId());
} else {
activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
}
}
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -238,4 +239,17 @@ public static boolean isMigratingToRemoteStore(ClusterSettings clusterSettings)

return (isMixedMode && isRemoteStoreMigrationDirection);
}

/**
* To check if the cluster is undergoing remote store migration using clusterState metadata
* @return
* <code>true</code> For <code>REMOTE_STORE</code> migration direction and <code>MIXED</code> compatibility mode,
* <code>false</code> otherwise
*/
public static boolean isMigratingToRemoteStore(Metadata metadata) {
boolean isMixedMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings()).equals(CompatibilityMode.MIXED);
boolean isRemoteStoreMigrationDirection = MIGRATION_DIRECTION_SETTING.get(metadata.settings()).equals(Direction.REMOTE_STORE);

return (isMixedMode && isRemoteStoreMigrationDirection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,37 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.test.VersionUtils;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -812,4 +822,134 @@ private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) {
}
}
}

public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() {
FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build());
AllocationService allocation = createAllocationService(Settings.builder().build());

// segment replication enabled
Settings.Builder settingsBuilder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);

// remote store migration metadata settings
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settingsBuilder).numberOfShards(1).numberOfReplicas(4))
.persistentSettings(
Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED.mode)
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE.direction)
.build()
)
.build();

RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();

ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(initialRoutingTable)
.build();

ShardId shardId = new ShardId(metadata.index("test").getIndex(), 0);

// add a remote node and start primary shard
Map<String, String> remoteStoreNodeAttributes = Map.of(
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_VALUE"
);
DiscoveryNode remoteNode1 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(remoteNode1)).build();
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build();
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4));

clusterState = startInitializingShardsAndReroute(allocation, clusterState);
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4));

// add remote and non-remote nodes and start replica shards
DiscoveryNode remoteNode2 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
DiscoveryNode remoteNode3 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
DiscoveryNode nonRemoteNode1 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode nonRemoteNode2 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
List<DiscoveryNode> replicaShardNodes = List.of(remoteNode2, remoteNode3, nonRemoteNode1, nonRemoteNode2);

for (int i = 0; i < 4; i++) {
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(replicaShardNodes.get(i)))
.build();

clusterState = allocation.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + i));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1)));

clusterState = startInitializingShardsAndReroute(allocation, clusterState);
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + (i + 1)));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1)));
}

// fail primary shard
ShardRouting primaryShard0 = clusterState.routingTable().index("test").shard(0).primaryShard();
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShard0, randomBoolean());
assertNotEquals(clusterState, newState);
clusterState = newState;

// verify that promoted replica exists on a remote node
assertEquals(4, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
ShardRouting primaryShardRouting1 = clusterState.routingTable().index("test").shard(0).primaryShard();
assertNotEquals(primaryShard0, primaryShardRouting1);
assertTrue(
primaryShardRouting1.currentNodeId().equals(remoteNode2.getId())
|| primaryShardRouting1.currentNodeId().equals(remoteNode3.getId())
);

// fail primary shard again
newState = allocation.applyFailedShard(clusterState, primaryShardRouting1, randomBoolean());
assertNotEquals(clusterState, newState);
clusterState = newState;

// verify that promoted replica again exists on a remote node
assertEquals(3, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
ShardRouting primaryShardRouting2 = clusterState.routingTable().index("test").shard(0).primaryShard();
assertNotEquals(primaryShardRouting1, primaryShardRouting2);
assertTrue(
primaryShardRouting2.currentNodeId().equals(remoteNode2.getId())
|| primaryShardRouting2.currentNodeId().equals(remoteNode3.getId())
);
assertNotEquals(primaryShardRouting1.currentNodeId(), primaryShardRouting2.currentNodeId());

ShardRouting expectedCandidateForSegRep = clusterState.getRoutingNodes().activeReplicaWithOldestVersion(shardId);

// fail primary shard again
newState = allocation.applyFailedShard(clusterState, primaryShardRouting2, randomBoolean());
assertNotEquals(clusterState, newState);
clusterState = newState;

// verify that promoted replica exists on a non-remote node
assertEquals(2, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
ShardRouting primaryShardRouting3 = clusterState.routingTable().index("test").shard(0).primaryShard();
assertNotEquals(primaryShardRouting2, primaryShardRouting3);
assertTrue(
primaryShardRouting3.currentNodeId().equals(nonRemoteNode1.getId())
|| primaryShardRouting3.currentNodeId().equals(nonRemoteNode2.getId())
);
assertEquals(expectedCandidateForSegRep.allocationId(), primaryShardRouting3.allocationId());
}
}

0 comments on commit 64909a3

Please sign in to comment.