Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Lakshya Taragi <lakshya.taragi@gmail.com>
  • Loading branch information
ltaragi committed Apr 22, 2024
1 parent 9d60253 commit 3dea5ff
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,141 @@ public void testFailoverRemotePrimaryToDocrepReplica() throws Exception {
);
}

/*
Scenario:
- Starts 1 docrep backed data node
- Creates an index with 0 replica
- Starts 1 remote backed data node
- Moves primary copy from docrep to remote through _cluster/reroute
- Starts 1 more remote backed data node
- Expands index to 2 replicas, one each on new remote node and docrep node
- Stops remote enabled node hosting the primary
- Ensures remote replica gets promoted to primary
- Ensures doc count is same after failover
- Indexes some more docs to ensure working of failed-over primary
*/
public void testFailoverRemotePrimaryToRemoteReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 1 docrep data node");
String docrepNodeName = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
assertEquals(internalCluster().client().admin().cluster().prepareGetRepositories().get().repositories().size(), 0);

logger.info("---> Creating index with 0 replica");
createIndex(FAILOVER_REMOTE_TO_REMOTE, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
initDocRepToRemoteMigration();

logger.info("---> Starting 1 remote enabled data node");
addRemote = true;
String remoteNodeName1 = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();
assertEquals(
internalCluster().client()
.admin()
.cluster()
.prepareGetRepositories(REPOSITORY_NAME, REPOSITORY_2_NAME)
.get()
.repositories()
.size(),
2
);

logger.info("---> Starting doc ingestion in parallel thread");
AsyncIndexingService asyncIndexingService = new AsyncIndexingService(FAILOVER_REMOTE_TO_REMOTE);
asyncIndexingService.startIndexing();

logger.info("---> Moving primary copy from docrep node {} to remote enabled node {}", docrepNodeName, remoteNodeName1);
assertAcked(
internalCluster().client()
.admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(FAILOVER_REMOTE_TO_REMOTE, 0, docrepNodeName, remoteNodeName1))
.get()
);
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);
assertEquals(primaryNodeName(FAILOVER_REMOTE_TO_REMOTE), remoteNodeName1);

logger.info("---> Starting 1 more remote enabled data node");
String remoteNodeName2 = internalCluster().startDataOnlyNode();
internalCluster().validateClusterFormed();

logger.info("---> Expanding index to 2 replica copies, on docrepNode and remoteNode2");
assertAcked(
internalCluster().client()
.admin()
.indices()
.prepareUpdateSettings()
.setIndices(FAILOVER_REMOTE_TO_REMOTE)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2).build())
.get()
);
ensureGreen(FAILOVER_REMOTE_TO_REMOTE);

logger.info("---> Stopping indexing thread");
asyncIndexingService.stopIndexing();

refreshAndWaitForReplication(FAILOVER_REMOTE_TO_REMOTE);
Map<ShardRouting, ShardStats> shardStatsMap = internalCluster().client()
.admin()
.indices()
.prepareStats(FAILOVER_REMOTE_TO_REMOTE)
.setDocs(true)
.get()
.asMap();
DiscoveryNodes nodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();
long initialPrimaryDocCount = 0;
for (ShardRouting shardRouting : shardStatsMap.keySet()) {
if (shardRouting.primary()) {
assertTrue(nodes.get(shardRouting.currentNodeId()).isRemoteStoreNode());
initialPrimaryDocCount = shardStatsMap.get(shardRouting).getStats().getDocs().getCount();
}
}
int firstBatch = (int) asyncIndexingService.getIndexedDocs();
assertReplicaAndPrimaryConsistency(FAILOVER_REMOTE_TO_REMOTE, firstBatch, 0);

logger.info("---> Stop remote store enabled node hosting the primary");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName1));
ensureStableCluster(3);
ensureYellow(FAILOVER_REMOTE_TO_REMOTE);
DiscoveryNodes finalNodes = internalCluster().client().admin().cluster().prepareState().get().getState().getNodes();

waitUntil(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
String nodeId = clusterState.getRoutingTable().index(FAILOVER_REMOTE_TO_REMOTE).shard(0).primaryShard().currentNodeId();
if (nodeId == null) {
return false;
} else {
assertEquals(finalNodes.get(nodeId).getName(), remoteNodeName2);
return finalNodes.get(nodeId).isRemoteStoreNode();
}
});

shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_REMOTE).setDocs(true).get().asMap();
long primaryDocCountAfterFailover = 0;
for (ShardRouting shardRouting : shardStatsMap.keySet()) {
if (shardRouting.primary()) {
assertTrue(finalNodes.get(shardRouting.currentNodeId()).isRemoteStoreNode());
primaryDocCountAfterFailover = shardStatsMap.get(shardRouting).getStats().getDocs().getCount();
}
}
assertEquals(initialPrimaryDocCount, primaryDocCountAfterFailover);

logger.info("---> Index some more docs to ensure that the failed over primary is ingesting new docs");
int secondBatch = randomIntBetween(1, 10);
logger.info("---> Indexing {} more docs", secondBatch);
indexBulk(FAILOVER_REMOTE_TO_REMOTE, secondBatch);
refreshAndWaitForReplication(FAILOVER_REMOTE_TO_REMOTE);

shardStatsMap = internalCluster().client().admin().indices().prepareStats(FAILOVER_REMOTE_TO_REMOTE).setDocs(true).get().asMap();
assertEquals(2, shardStatsMap.size());
shardStatsMap.forEach(
(shardRouting, shardStats) -> { assertEquals(firstBatch + secondBatch, shardStats.getStats().getDocs().getCount()); }
);
}

/*
Scenario:
- Starts 1 docrep backed data node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.core.Assertions;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -62,7 +61,6 @@
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -430,19 +428,10 @@ public ShardRouting activeReplicaWithOldestVersion(ShardId shardId) {
* are preferred for primary promotion
*/
public ShardRouting activeReplicaOnRemoteNode(ShardId shardId) {
List<ShardRouting> replicaShardsOnRemote = assignedShards(shardId).stream()
.filter(shr -> !shr.primary() && shr.active())
.filter((shr) -> {
RoutingNode nd = node(shr.currentNodeId());
return (nd != null && nd.node().isRemoteStoreNode());
})
.collect(Collectors.toList());
ShardRouting replicaShard = null;
if (replicaShardsOnRemote.isEmpty() == false) {
Random rand = Randomness.get();
replicaShard = replicaShardsOnRemote.get(rand.nextInt(replicaShardsOnRemote.size()));
}
return replicaShard;
return assignedShards(shardId).stream().filter(shr -> !shr.primary() && shr.active()).filter((shr) -> {
RoutingNode nd = node(shr.currentNodeId());
return (nd != null && nd.node().isRemoteStoreNode());
}).findFirst().orElse(null);
}

/**
Expand Down Expand Up @@ -763,7 +752,7 @@ private void unassignPrimaryAndPromoteActiveReplicaIfExists(
) {
assert failedShard.primary();
ShardRouting activeReplica = null;
if (isMigratingToRemoteStore(new ClusterSettings(metadata.settings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
if (isMigratingToRemoteStore(metadata.settings())) {
// we might not find any replica on remote node
activeReplica = activeReplicaOnRemoteNode(failedShard.shardId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -238,4 +239,17 @@ public static boolean isMigratingToRemoteStore(ClusterSettings clusterSettings)

return (isMixedMode && isRemoteStoreMigrationDirection);
}

/**
* To check if the cluster is undergoing remote store migration using metadata settings
* @return
* <code>true</code> For <code>REMOTE_STORE</code> migration direction and <code>MIXED</code> compatibility mode,
* <code>false</code> otherwise
*/
public static boolean isMigratingToRemoteStore(Settings settings) {
boolean isMixedMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings).equals(CompatibilityMode.MIXED);
boolean isRemoteStoreMigrationDirection = MIGRATION_DIRECTION_SETTING.get(settings).equals(Direction.REMOTE_STORE);

return (isMixedMode && isRemoteStoreMigrationDirection);
}
}

0 comments on commit 3dea5ff

Please sign in to comment.