Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefer remote replicas for primary promotion during migration #13136

Merged
merged 3 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,141 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
);
}

/*
Scenario:
- Starts 1 docrep backed data node
- Creates an index with 0 replica
- Starts 1 remote backed data node
- Moves primary copy from docrep to remote through _cluster/reroute
- Starts 1 more remote backed data node
- Expands index to 2 replicas, one each on new remote node and docrep node
- Stops remote enabled node hosting the primary
- Ensures remote replica gets promoted to primary
- Ensures doc count is same after failover
- Indexes some more docs to ensure working of failed-over primary
*/
public void testFailoverRemotePrimaryToRemoteReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 1 docrep data node");
String docrepNodeName = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0);

logger.info("---> Creating index with 0 replica");
createIndex(FAILOVER_REMOTE_TO_REMOTE, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
initDocRepToRemoteMigration();

logger.info("---> Starting 1 remote enabled data node");
addRemote = true;
String remoteNodeName1 = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
assertEquals(
internalCluster().client()
.admin()
.cluster()
.prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME)
.get()
.repositories()
.size(),
2
);

logger.info("---> Starting doc ingestion in parallel thread");
AsyncIndexingService asyncIndexingService = new AsyncIndexingService(FAILOVER_REMOTE_TO_REMOTE);
asyncIndexingService.startIndexing();

logger.info("---> Moving primary copy from docrep node {} to remote enabled node {}", docrepNodeName, remoteNodeName1);
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, docrepNodeName, remoteNodeName1))
.get()
);
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
assertEquals(primaryNodeName(FAILOVER_REMOTE_TO_REMOTE), remoteNodeName1);

logger.info("---> Starting 1 more remote enabled data node");
String remoteNodeName2 = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();

logger.info("---> Expanding index to 2 replica copies, on docrepNode and remoteNode2");
assertAcked(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings()
.setIndices(FAILOVER_REMOTE_TO_REMOTE)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build())
.get()
);
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);

logger.info("---> Stopping indexing thread");
asyncIndexingService.stopIndexing();

refreshAndWaitForReplication(FAILOVER_REMOTE_TO_REMOTE);
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client()
.admin()
.indices()
.prepareStats(FAILOVER_REMOTE_TO_REMOTE)
.setDocs(true)
.get()
.asMap();
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
long initialPrimaryDocCount = 0;
for (ShardRouting shardRouting : shardStatsMap.keySet()) {
if (shardRouting.primary()) {
assertTrue(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode());
initialPrimaryDocCount = shardStatsMap.get(shardRouting).getStats().getDocs().getCount();
}
}
int firstBatch = (int) asyncIndexingService.getIndexedDocs();
assertReplicaAndPrimaryConsistency(FAILOVER_REMOTE_TO_REMOTE, firstBatch, 0);

logger.info("---> Stop remote store enabled node hosting the primary");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName1));
ensureStableCluster(3);
ensureYellow(FAILOVER_REMOTE_TO_REMOTE);
DiscoveryNodes finalNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();

waitUntil(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
String nodeId = clusterState.getRoutingTable().index(FAILOVER_REMOTE_TO_REMOTE).shard(0).primaryShard().currentNodeId();
if (nodeId == null) {
return false;
} else {
assertEquals(finalNodes.get(nodeId).getName(), remoteNodeName2);
return finalNodes.get(nodeId).isRemoteStoreNode();
}
});

shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_REMOTE).setDocs(true).get().asMap();
long primaryDocCountAfterFailover = 0;
for (ShardRouting shardRouting : shardStatsMap.keySet()) {
if (shardRouting.primary()) {
assertTrue(finalNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode());
primaryDocCountAfterFailover = shardStatsMap.get(shardRouting).getStats().getDocs().getCount();
}
}
assertEquals(initialPrimaryDocCount, primaryDocCountAfterFailover);

logger.info("---> Index some more docs to ensure that the failed over primary is ingesting new docs");
int secondBatch = randomIntBetween(1, 10);
logger.info("---> Indexing {} more docs", secondBatch);
indexBulk(FAILOVER_REMOTE_TO_REMOTE, secondBatch);
refreshAndWaitForReplication(FAILOVER_REMOTE_TO_REMOTE);

shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_REMOTE).setDocs(true).get().asMap();
assertEquals(2, shardStatsMap.size());
shardStatsMap.forEach(
(shardRouting, shardStats) -> { assertEquals(firstBatch + secondBatch, shardStats.getStats().getDocs().getCount()); }
);
}

/*
Scenario:
- Starts 1 docrep backed data node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.isMigratingToRemoteStore;

/**
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
* It can be either initialized as mutable or immutable (see {@link #RoutingNodes(ClusterState, boolean)}), allowing
Expand Down Expand Up @@ -418,6 +420,20 @@ public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) {
.orElse(null);
}

/**
* Returns one active replica shard on a remote node for the given shard id or <code>null</code> if
* no such replica is found.
* <p>
* Since we aim to continue moving forward during remote store migration, replicas already migrated to remote nodes
* are preferred for primary promotion
*/
public ShardRouting activeReplicaOnRemoteNode(ShardId shardId) {
return assignedShards(shardId).stream().filter(shr -> !shr.primary() && shr.active()).filter((shr) -> {
RoutingNode nd = node(shr.currentNodeId());
return (nd != null && nd.node().isRemoteStoreNode());
}).findFirst().orElse(null);
}

/**
* Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code>
*/
Expand Down Expand Up @@ -735,11 +751,17 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists(
RoutingChangesObserver routingChangesObserver
) {
assert failedShard.primary();
ShardRouting activeReplica;
if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) {
activeReplica = activeReplicaWithOldestVersion(failedShard.shardId());
} else {
activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
ShardRouting activeReplica = null;
if (isMigratingToRemoteStore(metadata)) {
ltaragi marked this conversation as resolved.
Show resolved Hide resolved
// we might not find any replica on remote node
activeReplica = activeReplicaOnRemoteNode(failedShard.shardId());
}
if (activeReplica == null) {
if (metadata.isSegmentReplicationEnabled(failedShard.getIndexName())) {
activeReplica = activeReplicaWithOldestVersion(failedShard.shardId());
} else {
activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
}
}
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -238,4 +239,17 @@ public static boolean isMigratingToRemoteStore(ClusterSettings clusterSettings)

return (isMixedMode && isRemoteStoreMigrationDirection);
}

/**
* To check if the cluster is undergoing remote store migration using clusterState metadata
* @return
* <code>true</code> For <code>REMOTE_STORE</code> migration direction and <code>MIXED</code> compatibility mode,
* <code>false</code> otherwise
*/
public static boolean isMigratingToRemoteStore(Metadata metadata) {
boolean isMixedMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings()).equals(CompatibilityMode.MIXED);
boolean isRemoteStoreMigrationDirection = MIGRATION_DIRECTION_SETTING.get(metadata.settings()).equals(Direction.REMOTE_STORE);

return (isMixedMode && isRemoteStoreMigrationDirection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,37 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.test.VersionUtils;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -812,4 +822,134 @@ private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) {
}
}
}

public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() {
ltaragi marked this conversation as resolved.
Show resolved Hide resolved
FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build());
AllocationService allocation = createAllocationService(Settings.builder().build());

// segment replication enabled
Settings.Builder settingsBuilder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT);

// remote store migration metadata settings
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settingsBuilder).numberOfShards(1).numberOfReplicas(4))
.persistentSettings(
Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED.mode)
.put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE.direction)
.build()
)
.build();

RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();

ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(initialRoutingTable)
.build();

ShardId shardId = new ShardId(metadata.index("test").getIndex(), 0);

// add a remote node and start primary shard
Map<String, String> remoteStoreNodeAttributes = Map.of(
REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY,
"REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_VALUE"
);
DiscoveryNode remoteNode1 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(remoteNode1)).build();
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build();
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4));

clusterState = startInitializingShardsAndReroute(allocation, clusterState);
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4));

// add remote and non-remote nodes and start replica shards
DiscoveryNode remoteNode2 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
DiscoveryNode remoteNode3 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
remoteStoreNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);
DiscoveryNode nonRemoteNode1 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode nonRemoteNode2 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
List<DiscoveryNode> replicaShardNodes = List.of(remoteNode2, remoteNode3, nonRemoteNode1, nonRemoteNode2);

for (int i = 0; i < 4; i++) {
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder(clusterState.nodes()).add(replicaShardNodes.get(i)))
.build();

clusterState = allocation.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + i));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1)));

clusterState = startInitializingShardsAndReroute(allocation, clusterState);
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1 + (i + 1)));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(4 - (i + 1)));
}

// fail primary shard
ShardRouting primaryShard0 = clusterState.routingTable().index("test").shard(0).primaryShard();
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShard0, randomBoolean());
assertNotEquals(clusterState, newState);
clusterState = newState;

// verify that promoted replica exists on a remote node
assertEquals(4, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
ShardRouting primaryShardRouting1 = clusterState.routingTable().index("test").shard(0).primaryShard();
assertNotEquals(primaryShard0, primaryShardRouting1);
assertTrue(
primaryShardRouting1.currentNodeId().equals(remoteNode2.getId())
|| primaryShardRouting1.currentNodeId().equals(remoteNode3.getId())
);

// fail primary shard again
newState = allocation.applyFailedShard(clusterState, primaryShardRouting1, randomBoolean());
assertNotEquals(clusterState, newState);
clusterState = newState;

// verify that promoted replica again exists on a remote node
assertEquals(3, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
ShardRouting primaryShardRouting2 = clusterState.routingTable().index("test").shard(0).primaryShard();
assertNotEquals(primaryShardRouting1, primaryShardRouting2);
assertTrue(
primaryShardRouting2.currentNodeId().equals(remoteNode2.getId())
|| primaryShardRouting2.currentNodeId().equals(remoteNode3.getId())
);
assertNotEquals(primaryShardRouting1.currentNodeId(), primaryShardRouting2.currentNodeId());

ShardRouting expectedCandidateForSegRep = clusterState.getRoutingNodes().activeReplicaWithOldestVersion(shardId);

// fail primary shard again
newState = allocation.applyFailedShard(clusterState, primaryShardRouting2, randomBoolean());
assertNotEquals(clusterState, newState);
clusterState = newState;

// verify that promoted replica exists on a non-remote node
assertEquals(2, clusterState.getRoutingNodes().shardsWithState(STARTED).size());
ShardRouting primaryShardRouting3 = clusterState.routingTable().index("test").shard(0).primaryShard();
assertNotEquals(primaryShardRouting2, primaryShardRouting3);
assertTrue(
primaryShardRouting3.currentNodeId().equals(nonRemoteNode1.getId())
|| primaryShardRouting3.currentNodeId().equals(nonRemoteNode2.getId())
);
assertEquals(expectedCandidateForSegRep.allocationId(), primaryShardRouting3.allocationId());
}
}
Loading