Skip to content

Commit

Permalink
Bugfix: add replica information in remote store restore flow (#8951)
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale committed Aug 28, 2023
1 parent a1d8beb commit 569d5c2
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class RemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAMES = "test-remote-store-1,test-remote-store-2,remote-store-test-index-1,remote-store-test-index-2";
private static final String INDEX_NAMES_WILDCARD = "test-remote-store-*,remote-store-test-index-*";
private static final String TOTAL_OPERATIONS = "total-operations";
private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations";
private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total";

@Override
Expand Down Expand Up @@ -72,18 +71,26 @@ private void restore(String... indices) {
);
}

private void verifyRestoredData(Map<String, Long> indexStats, String indexName) {
// This is required to get updated number from already active shards which were not restored
refresh(indexName);
private void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS));
// This is to ensure that shards that were already assigned will get latest count
refresh(indexName);
assertBusy(
() -> assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)),
30,
TimeUnit.SECONDS
);
IndexResponse response = indexSingleDoc(indexName);
if (indexStats.containsKey(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id())) {
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo());
}
refresh(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1);
assertBusy(
() -> assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1),
30,
TimeUnit.SECONDS
);
}

private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
Expand All @@ -100,15 +107,15 @@ private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, St
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
public void testRemoteTranslogRestoreWithNoDataPostCommit() throws IOException {
public void testRemoteTranslogRestoreWithNoDataPostCommit() throws Exception {
testRestoreFlow(1, true, randomIntBetween(1, 5));
}

/**
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
public void testRemoteTranslogRestoreWithNoDataPostRefresh() throws IOException {
public void testRemoteTranslogRestoreWithNoDataPostRefresh() throws Exception {
testRestoreFlow(1, false, randomIntBetween(1, 5));
}

Expand All @@ -117,7 +124,7 @@ public void testRemoteTranslogRestoreWithNoDataPostRefresh() throws IOException
* and unrefreshed data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
public void testRemoteTranslogRestoreWithRefreshedData() throws IOException {
public void testRemoteTranslogRestoreWithRefreshedData() throws Exception {
testRestoreFlow(randomIntBetween(2, 5), false, randomIntBetween(1, 5));
}

Expand All @@ -126,25 +133,23 @@ public void testRemoteTranslogRestoreWithRefreshedData() throws IOException {
* and unrefreshed data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
public void testRemoteTranslogRestoreWithCommittedData() throws IOException {
public void testRemoteTranslogRestoreWithCommittedData() throws Exception {
testRestoreFlow(randomIntBetween(2, 5), true, randomIntBetween(1, 5));
}

/**
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOException {
public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws Exception {
testRestoreFlowBothPrimaryReplicasDown(1, true, randomIntBetween(1, 5));
}

/**
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOException {
public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws Exception {
testRestoreFlowBothPrimaryReplicasDown(1, false, randomIntBetween(1, 5));
}

Expand All @@ -153,8 +158,7 @@ public void testRTSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOExc
* and unrefreshed data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOException {
public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws Exception {
testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), false, randomIntBetween(1, 5));
}

Expand All @@ -163,12 +167,11 @@ public void testRTSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOExcepti
* and unrefreshed data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithCommittedDataPrimaryReplicaDown() throws IOException {
public void testRTSRestoreWithCommittedDataPrimaryReplicaDown() throws Exception {
testRestoreFlowBothPrimaryReplicasDown(randomIntBetween(2, 5), true, randomIntBetween(1, 5));
}

private void restoreAndVerify(int shardCount, int replicaCount, Map<String, Long> indexStats) {
private void restoreAndVerify(int shardCount, int replicaCount, Map<String, Long> indexStats) throws Exception {
restore(INDEX_NAME);
ensureGreen(INDEX_NAME);
// This is required to get updated number from already active shards which were not restored
Expand All @@ -183,7 +186,7 @@ private void restoreAndVerify(int shardCount, int replicaCount, Map<String, Long
* @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked.
* @throws IOException IO Exception.
*/
private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, int shardCount) throws IOException {
private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, int shardCount) throws Exception {
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
Expand All @@ -202,10 +205,10 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, int sh
* @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked.
* @throws IOException IO Exception.
*/
private void testRestoreFlowBothPrimaryReplicasDown(int numberOfIterations, boolean invokeFlush, int shardCount) throws IOException {
private void testRestoreFlowBothPrimaryReplicasDown(int numberOfIterations, boolean invokeFlush, int shardCount) throws Exception {
prepareCluster(1, 2, INDEX_NAME, 1, shardCount);
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
assertEquals(shardCount * 2, getNumShards(INDEX_NAME).totalNumShards);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNodeName(INDEX_NAME)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
Expand All @@ -221,14 +224,14 @@ private void testRestoreFlowBothPrimaryReplicasDown(int numberOfIterations, bool
* @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked.
* @throws IOException IO Exception.
*/
private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invokeFlush, int shardCount) throws IOException {
private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invokeFlush, int shardCount) throws Exception {
prepareCluster(1, 3, INDEX_NAMES, 1, shardCount);
String[] indices = INDEX_NAMES.split(",");
Map<String, Map<String, Long>> indicesStats = new HashMap<>();
for (String index : indices) {
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, index);
indicesStats.put(index, indexStats);
assertEquals(shardCount, getNumShards(index).totalNumShards);
assertEquals(shardCount * 2, getNumShards(index).totalNumShards);
}

for (String index : indices) {
Expand Down Expand Up @@ -259,7 +262,7 @@ private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invo
);
ensureGreen(indices);
for (String index : indices) {
assertEquals(shardCount, getNumShards(index).totalNumShards);
assertEquals(shardCount * 2, getNumShards(index).totalNumShards);
verifyRestoredData(indicesStats.get(index), index);
}
}
Expand All @@ -280,7 +283,7 @@ public void testRestoreFlowAllShardsNoRedIndex() throws InterruptedException {
}
}

public void testRestoreFlowNoRedIndex() {
public void testRestoreFlowNoRedIndex() throws Exception {
int shardCount = randomIntBetween(1, 5);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 5), true, INDEX_NAME);
Expand All @@ -302,7 +305,7 @@ public void testRestoreFlowNoRedIndex() {
* @throws IOException IO Exception.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8480")
public void testRTSRestoreWithCommittedDataMultipleIndicesPatterns() throws IOException {
public void testRTSRestoreWithCommittedDataMultipleIndicesPatterns() throws Exception {
testRestoreFlowMultipleIndices(2, true, randomIntBetween(1, 5));
}

Expand All @@ -313,7 +316,7 @@ public void testRTSRestoreWithCommittedDataMultipleIndicesPatterns() throws IOEx
* @throws IOException IO Exception.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8480")
public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws IOException {
public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws Exception {
int shardCount = randomIntBetween(1, 5);
prepareCluster(1, 3, INDEX_NAMES, 1, shardCount);
String[] indices = INDEX_NAMES.split(",");
Expand Down Expand Up @@ -354,7 +357,7 @@ public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws IOExceptio
* with only some of the remote-enabled red indices requested for the restore.
* @throws IOException IO Exception.
*/
public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws IOException {
public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws Exception {
int shardCount = randomIntBetween(1, 5);
prepareCluster(1, 3, INDEX_NAMES, 0, shardCount);
String[] indices = INDEX_NAMES.split(",");
Expand Down Expand Up @@ -402,7 +405,7 @@ public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws IOExc
* @throws IOException IO Exception.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8480")
public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOException {
public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws Exception {
int shardCount = randomIntBetween(1, 5);
prepareCluster(1, 3, INDEX_NAMES, 1, shardCount);
String[] indices = INDEX_NAMES.split(",");
Expand Down Expand Up @@ -451,7 +454,7 @@ public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOExc
* when the index has no data.
* @throws IOException IO Exception.
*/
public void testRTSRestoreDataOnlyInTranslog() throws IOException {
public void testRTSRestoreDataOnlyInTranslog() throws Exception {
testRestoreFlow(0, true, randomIntBetween(1, 5));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ public Builder initializeAsRestore(IndexMetadata indexMetadata, SnapshotRecovery
public Builder initializeAsRemoteStoreRestore(
IndexMetadata indexMetadata,
RemoteStoreRecoverySource recoverySource,
Map<ShardId, ShardRouting> activeInitializingShards
Map<ShardId, IndexShardRoutingTable> indexShardRoutingTableMap,
boolean restoreAllShards
) {
final UnassignedInfo unassignedInfo = new UnassignedInfo(
UnassignedInfo.Reason.EXISTING_INDEX_RESTORED,
Expand All @@ -465,11 +466,34 @@ public Builder initializeAsRemoteStoreRestore(
}
for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) {
ShardId shardId = new ShardId(index, shardNumber);
if (indexShardRoutingTableMap.containsKey(shardId) == false) {
throw new IllegalStateException("IndexShardRoutingTable is not present for shardId: " + shardId);
}
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
if (activeInitializingShards.containsKey(shardId)) {
indexShardRoutingBuilder.addShard(activeInitializingShards.get(shardId));
} else {
IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingTableMap.get(shardId);
if (restoreAllShards || indexShardRoutingTable.primaryShard().unassigned()) {
// Primary shard to be recovered from remote store.
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, true, recoverySource, unassignedInfo));
// All the replica shards to be recovered from peer recovery.
indexShardRoutingTable.replicaShards()
.forEach(
shardRouting -> indexShardRoutingBuilder.addShard(
ShardRouting.newUnassigned(shardId, false, PeerRecoverySource.INSTANCE, unassignedInfo)
)
);
} else {
// Primary is either active or initializing. Do not trigger restore.
indexShardRoutingBuilder.addShard(indexShardRoutingTable.primaryShard());
// Replica, if unassigned, trigger peer recovery else no action.
for (ShardRouting shardRouting : indexShardRoutingTable.replicaShards()) {
if (shardRouting.unassigned()) {
indexShardRoutingBuilder.addShard(
ShardRouting.newUnassigned(shardId, false, PeerRecoverySource.INSTANCE, unassignedInfo)
);
} else {
indexShardRoutingBuilder.addShard(shardRouting);
}
}
}
shards.put(shardNumber, indexShardRoutingBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,10 +575,11 @@ public Builder addAsFromOpenToClose(IndexMetadata indexMetadata) {
public Builder addAsRemoteStoreRestore(
IndexMetadata indexMetadata,
RemoteStoreRecoverySource recoverySource,
Map<ShardId, ShardRouting> activeInitializingShards
Map<ShardId, IndexShardRoutingTable> indexShardRoutingTableMap,
boolean restoreAllShards
) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex())
.initializeAsRemoteStoreRestore(indexMetadata, recoverySource, activeInitializingShards);
.initializeAsRemoteStoreRestore(indexMetadata, recoverySource, indexShardRoutingTableMap, restoreAllShards);
add(indexRoutingBuilder);
return this;
}
Expand Down
Loading

0 comments on commit 569d5c2

Please sign in to comment.