Skip to content

Commit

Permalink
[Segment Replication] Add Segment Replication Specific Integration Te…
Browse files Browse the repository at this point in the history
…sts (opensearch-project#11773)

* Run few tests with Segment Replication enabled.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Update reason for ignoring test.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* remove @ignore to resolve :server:forbiddenApisInternalClusterTest check.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* fix spotlessCheck.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* add conditional logic of force refresh.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Address comments on PR.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* fix failing errors.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Use parameterization for running segment replication tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix failing tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix failing test.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* add new waitForReplication() and refactor.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Address comments on PR and revert back changes made to SegmentReplication Tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* revert changes made to Segrep tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Refactor and address comments.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* fix failure of using forbidden api new Random().

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Add comments to debug.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Remove non-critical tests from running with segrep.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix test to run with segrep.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* separate out refresh and waitForReplication into different methods.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* refactor with usage of waitForReplication().

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* fix parameters passed in factory for IndexStatsIT.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Update IndexstatsIT to run with segrep

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

---------

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 authored and Peter Alfonsi committed Mar 1, 2024
1 parent 4e8328b commit 6b738c6
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand Down Expand Up @@ -175,17 +174,6 @@ private IndexShard getIndexShard(ClusterState state, ShardRouting routing, Strin
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName);
}

/**
* Fetch IndexShard by shardId, multiple shards per node allowed.
*/
protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> id = indexService.shardIds().stream().filter(sid -> sid == shardId.id()).findFirst();
return indexService.getShard(id.get());
}

/**
* Fetch IndexShard, assumes only a single shard per node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
Expand All @@ -130,7 +131,8 @@ public IndexStatsIT(Settings settings) {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() }
);
}

Expand Down Expand Up @@ -175,7 +177,7 @@ public void testFieldDataStats() throws InterruptedException {
ensureGreen();
client().prepareIndex("test").setId("1").setSource("field", "value1", "field2", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2", "field2", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
indexRandomForConcurrentSearch("test");

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
Expand Down Expand Up @@ -299,7 +301,7 @@ public void testClearAllCaches() throws Exception {
client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client().prepareIndex("test").setId("1").setSource("field", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
indexRandomForConcurrentSearch("test");

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
Expand Down Expand Up @@ -667,7 +669,7 @@ public void testSimpleStats() throws Exception {
client().prepareIndex("test1").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test1").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
refresh();
refreshAndWaitForReplication();

NumShards test1 = getNumShards("test1");
long test1ExpectedWrites = 2 * test1.dataCopies;
Expand All @@ -682,7 +684,13 @@ public void testSimpleStats() throws Exception {
assertThat(stats.getPrimaries().getIndexing().getTotal().getIndexFailedCount(), equalTo(0L));
assertThat(stats.getPrimaries().getIndexing().getTotal().isThrottled(), equalTo(false));
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis(), equalTo(0L));
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));

// This assert should not be done on segrep enabled indices because we are asserting Indexing/Write operations count on
// all primary and replica shards. But in case of segrep, Indexing/Write operation don't happen on replica shards. So we can
// ignore this assert check for segrep enabled indices.
if (isSegmentReplicationEnabledForIndex("test1") == false && isSegmentReplicationEnabledForIndex("test2") == false) {
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));
}
assertThat(stats.getTotal().getStore(), notNullValue());
assertThat(stats.getTotal().getMerge(), notNullValue());
assertThat(stats.getTotal().getFlush(), notNullValue());
Expand Down Expand Up @@ -825,6 +833,7 @@ public void testMergeStats() {
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
stats = client().admin().indices().prepareStats().setMerge(true).execute().actionGet();

refreshAndWaitForReplication();
assertThat(stats.getTotal().getMerge(), notNullValue());
assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0L));
}
Expand All @@ -851,7 +860,7 @@ public void testSegmentsStats() {

client().admin().indices().prepareFlush().get();
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
client().admin().indices().prepareRefresh().get();
refreshAndWaitForReplication();
stats = client().admin().indices().prepareStats().setSegments(true).get();

assertThat(stats.getTotal().getSegments(), notNullValue());
Expand All @@ -869,7 +878,7 @@ public void testAllFlags() throws Exception {
client().prepareIndex("test_index").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test_index_2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();

client().admin().indices().prepareRefresh().execute().actionGet();
refreshAndWaitForReplication();
IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
Flag[] values = CommonStatsFlags.Flag.values();
for (Flag flag : values) {
Expand Down Expand Up @@ -1453,6 +1462,7 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() {
.get()
.status()
);
refreshAndWaitForReplication();
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0];
RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.recovery;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
Expand All @@ -52,10 +54,11 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -69,12 +72,26 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout;

public class RecoveryWhileUnderLoadIT extends OpenSearchIntegTestCase {
public class RecoveryWhileUnderLoadIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public RecoveryWhileUnderLoadIT(Settings settings) {
super(settings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build() },
new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() }
);
}

private final Logger logger = LogManager.getLogger(RecoveryWhileUnderLoadIT.class);

public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {
Expand Down Expand Up @@ -150,7 +167,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasTest() throws Exception {
logger.info("--> indexing threads stopped");

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -211,7 +228,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() thr
logger.info("--> indexing threads stopped");

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -325,7 +342,7 @@ public void testRecoverWhileUnderLoadWithReducedAllowedNodes() throws Exception
);

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numberOfShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -375,7 +392,7 @@ public void testRecoverWhileRelocating() throws Exception {
ensureGreen(TimeValue.timeValueMinutes(5));

logger.info("--> refreshing the index");
refreshAndAssert();
assertAfterRefreshAndWaitForReplication();
logger.info("--> verifying indexed content");
iterateAssertCount(numShards, 10, indexer.getIds());
}
Expand Down Expand Up @@ -474,10 +491,11 @@ private void logSearchResponse(int numberOfShards, long numberOfDocs, int iterat
);
}

private void refreshAndAssert() throws Exception {
private void assertAfterRefreshAndWaitForReplication() throws Exception {
assertBusy(() -> {
RefreshResponse actionGet = client().admin().indices().prepareRefresh().get();
assertAllSuccessful(actionGet);
}, 5, TimeUnit.MINUTES);
waitForReplication();
}
}
Loading

0 comments on commit 6b738c6

Please sign in to comment.