diff --git a/.ci/bwcVersions b/.ci/bwcVersions index 1dc8dc955f7c6..e82101896818e 100644 --- a/.ci/bwcVersions +++ b/.ci/bwcVersions @@ -51,4 +51,5 @@ BWC_VERSION: - "2.2.1" - "2.2.2" - "2.3.0" + - "2.3.1" - "2.4.0" diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f5a3cb806070..9da56b80fadef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,12 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Support for labels on version bump PRs, skip label support for changelog verifier ([#4391](https://github.com/opensearch-project/OpenSearch/pull/4391)) - Update previous release bwc version to 2.4.0 ([#4455](https://github.com/opensearch-project/OpenSearch/pull/4455)) - 2.3.0 release notes ([#4457](https://github.com/opensearch-project/OpenSearch/pull/4457)) +- Added missing javadocs for `:distribution:tools` modules ([#4483](https://github.com/opensearch-project/OpenSearch/pull/4483)) +- Add BWC version 2.3.1 ([#4513](https://github.com/opensearch-project/OpenSearch/pull/4513)) +- [Segment Replication] Add snapshot and restore tests for segment replication feature ([#3993](https://github.com/opensearch-project/OpenSearch/pull/3993)) + +### Dependencies +- Bumps `reactive-streams` from 1.0.3 to 1.0.4 ### Dependencies - Bumps `org.gradle.test-retry` from 1.4.0 to 1.4.1 @@ -32,9 +38,11 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308)) - Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240)) - Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156)) +- Weighted round-robin scheduling policy for shard coordination traffic ([#4241](https://github.com/opensearch-project/OpenSearch/pull/4241)) - Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253)) - [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402)) - [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) +- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318)) ### Deprecated @@ -60,7 +68,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - [Segment Replication] Fix timeout issue by calculating time needed to process getSegmentFiles ([#4426](https://github.com/opensearch-project/OpenSearch/pull/4426)) - [Bug]: gradle check failing with java heap OutOfMemoryError (([#4328](https://github.com/opensearch-project/OpenSearch/ - `opensearch.bat` fails to execute when install path includes spaces ([#4362](https://github.com/opensearch-project/OpenSearch/pull/4362)) +- Getting security exception due to access denied 'java.lang.RuntimePermission' 'accessDeclaredMembers' when trying to get snapshot with S3 IRSA ([#4469](https://github.com/opensearch-project/OpenSearch/pull/4469)) - Fixed flaky test `ResourceAwareTasksTests.testTaskIdPersistsInThreadContext` ([#4484](https://github.com/opensearch-project/OpenSearch/pull/4484)) +- Fixed the ignore_malformed setting to also ignore objects ([#4494](https://github.com/opensearch-project/OpenSearch/pull/4494)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/distribution/tools/java-version-checker/src/main/java/org/opensearch/tools/java_version_checker/SuppressForbidden.java b/distribution/tools/java-version-checker/src/main/java/org/opensearch/tools/java_version_checker/SuppressForbidden.java index 725718d85b179..d02e4e98b1287 100644 --- a/distribution/tools/java-version-checker/src/main/java/org/opensearch/tools/java_version_checker/SuppressForbidden.java +++ b/distribution/tools/java-version-checker/src/main/java/org/opensearch/tools/java_version_checker/SuppressForbidden.java @@ -43,5 +43,10 @@ @Retention(RetentionPolicy.CLASS) @Target({ ElementType.CONSTRUCTOR, ElementType.FIELD, ElementType.METHOD, ElementType.TYPE }) public @interface SuppressForbidden { + /** + * The argument to this annotation, specifying the reason a forbidden API is being used. + * + * @return The reason the error is being suppressed. + */ String reason(); } diff --git a/distribution/tools/java-version-checker/src/main/java/org/opensearch/tools/java_version_checker/package-info.java b/distribution/tools/java-version-checker/src/main/java/org/opensearch/tools/java_version_checker/package-info.java new file mode 100644 index 0000000000000..a626a125bb4c9 --- /dev/null +++ b/distribution/tools/java-version-checker/src/main/java/org/opensearch/tools/java_version_checker/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Tools to validate minimum version of the runtime Java. + */ +package org.opensearch.tools.java_version_checker; diff --git a/distribution/tools/keystore-cli/src/main/java/org/opensearch/common/settings/KeyStoreCli.java b/distribution/tools/keystore-cli/src/main/java/org/opensearch/common/settings/KeyStoreCli.java index 4789c5df416e6..7a772526cd66b 100644 --- a/distribution/tools/keystore-cli/src/main/java/org/opensearch/common/settings/KeyStoreCli.java +++ b/distribution/tools/keystore-cli/src/main/java/org/opensearch/common/settings/KeyStoreCli.java @@ -36,7 +36,7 @@ import org.opensearch.cli.Terminal; /** - * A cli tool for managing secrets in the opensearch keystore. + * A CLI tool for managing secrets in the OpenSearch keystore. */ public class KeyStoreCli extends LoggingAwareMultiCommand { @@ -52,6 +52,12 @@ private KeyStoreCli() { subcommands.put("has-passwd", new HasPasswordKeyStoreCommand()); } + /** + * Main entry point for the OpenSearch Keystore CLI tool. + * + * @param args CLI commands for managing secrets. + * @throws Exception if an exception was encountered executing the command. + */ public static void main(String[] args) throws Exception { exit(new KeyStoreCli().main(args, Terminal.DEFAULT)); } diff --git a/distribution/tools/keystore-cli/src/main/java/org/opensearch/common/settings/package-info.java b/distribution/tools/keystore-cli/src/main/java/org/opensearch/common/settings/package-info.java new file mode 100644 index 0000000000000..3969fb4f91e49 --- /dev/null +++ b/distribution/tools/keystore-cli/src/main/java/org/opensearch/common/settings/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Classes implementing a CLI tool for managing secrets in the OpenSearch keystore. + */ +package org.opensearch.common.settings; diff --git a/distribution/tools/launchers/build.gradle b/distribution/tools/launchers/build.gradle index 52100296ac7e6..7ebe5c7e64416 100644 --- a/distribution/tools/launchers/build.gradle +++ b/distribution/tools/launchers/build.gradle @@ -54,6 +54,5 @@ testingConventions { } javadoc.enabled = false -missingJavadoc.enabled = false loggerUsageCheck.enabled = false jarHell.enabled = false diff --git a/distribution/tools/launchers/src/main/java/org/opensearch/tools/launchers/package-info.java b/distribution/tools/launchers/src/main/java/org/opensearch/tools/launchers/package-info.java new file mode 100644 index 0000000000000..c77d9cab1f468 --- /dev/null +++ b/distribution/tools/launchers/src/main/java/org/opensearch/tools/launchers/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Classes implementing utility methods for launching JVMs. + */ +package org.opensearch.tools.launchers; diff --git a/distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/PluginCli.java b/distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/PluginCli.java index fc93068ce416b..9b06235e87e86 100644 --- a/distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/PluginCli.java +++ b/distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/PluginCli.java @@ -42,7 +42,7 @@ import java.util.Collections; /** - * A cli tool for adding, removing and listing plugins for opensearch. + * A CLI tool for adding, removing and listing plugins for OpenSearch. */ public class PluginCli extends LoggingAwareMultiCommand { @@ -56,6 +56,12 @@ private PluginCli() { commands = Collections.unmodifiableCollection(subcommands.values()); } + /** + * Main entry point for the OpenSearch Plugin CLI tool. + * + * @param args CLI commands for managing plugins. + * @throws Exception if an exception was encountered executing the command. + */ public static void main(String[] args) throws Exception { exit(new PluginCli().main(args, Terminal.DEFAULT)); } diff --git a/distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/PluginHelper.java b/distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/PluginHelper.java index 1ef4dd9a36d1c..13d8ab62c1f8d 100644 --- a/distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/PluginHelper.java +++ b/distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/PluginHelper.java @@ -19,6 +19,8 @@ */ public class PluginHelper { + private PluginHelper() {} + /** * Verify if a plugin exists with any folder name. * @param pluginPath the path for the plugins directory. diff --git a/distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/package-info.java b/distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/package-info.java new file mode 100644 index 0000000000000..b762e59ae8095 --- /dev/null +++ b/distribution/tools/plugin-cli/src/main/java/org/opensearch/plugins/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Classes implementing a CLI tool for managing plugins in OpenSearch. + */ +package org.opensearch.plugins; diff --git a/gradle/missing-javadoc.gradle b/gradle/missing-javadoc.gradle index 6b3dacd3e905a..248a714f4f3e3 100644 --- a/gradle/missing-javadoc.gradle +++ b/gradle/missing-javadoc.gradle @@ -95,10 +95,6 @@ configure([ project(":client:client-benchmark-noop-api-plugin"), project(":client:rest-high-level"), project(":client:test"), - project(":distribution:tools:java-version-checker"), - project(":distribution:tools:keystore-cli"), - project(":distribution:tools:launchers"), - project(":distribution:tools:plugin-cli"), project(":doc-tools"), project(":example-plugins:custom-settings"), project(":example-plugins:custom-significance-heuristic"), diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index 08cd32e80a7ca..755f00ac9b1b3 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -55,7 +55,7 @@ dependencies { api "io.netty:netty-transport-native-unix-common:${versions.netty}" implementation project(':modules:transport-netty4') api 'com.azure:azure-storage-blob:12.16.1' - api 'org.reactivestreams:reactive-streams:1.0.3' + api 'org.reactivestreams:reactive-streams:1.0.4' api 'io.projectreactor:reactor-core:3.4.18' api 'io.projectreactor.netty:reactor-netty:1.0.18' api 'io.projectreactor.netty:reactor-netty-core:1.0.22' diff --git a/plugins/repository-azure/licenses/reactive-streams-1.0.3.jar.sha1 b/plugins/repository-azure/licenses/reactive-streams-1.0.3.jar.sha1 deleted file mode 100644 index 77210f7c7b402..0000000000000 --- a/plugins/repository-azure/licenses/reactive-streams-1.0.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -d9fb7a7926ffa635b3dcaa5049fb2bfa25b3e7d0 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/reactive-streams-1.0.4.jar.sha1 b/plugins/repository-azure/licenses/reactive-streams-1.0.4.jar.sha1 new file mode 100644 index 0000000000000..45a80e3f7e361 --- /dev/null +++ b/plugins/repository-azure/licenses/reactive-streams-1.0.4.jar.sha1 @@ -0,0 +1 @@ +3864a1320d97d7b045f729a326e1e077661f31b7 \ No newline at end of file diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java index 18bb62944dede..930af6f8a9799 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java @@ -305,21 +305,28 @@ static AWSCredentialsProvider buildCredentials(Logger logger, S3ClientSettings c } if (irsaCredentials.getIdentityTokenFile() == null) { - return new PrivilegedSTSAssumeRoleSessionCredentialsProvider<>( - securityTokenService, + final STSAssumeRoleSessionCredentialsProvider.Builder stsCredentialsProviderBuilder = new STSAssumeRoleSessionCredentialsProvider.Builder(irsaCredentials.getRoleArn(), irsaCredentials.getRoleSessionName()) - .withStsClient(securityTokenService) - .build() + .withStsClient(securityTokenService); + + final STSAssumeRoleSessionCredentialsProvider stsCredentialsProvider = SocketAccess.doPrivileged( + stsCredentialsProviderBuilder::build ); + + return new PrivilegedSTSAssumeRoleSessionCredentialsProvider<>(securityTokenService, stsCredentialsProvider); } else { - return new PrivilegedSTSAssumeRoleSessionCredentialsProvider<>( - securityTokenService, + final STSAssumeRoleWithWebIdentitySessionCredentialsProvider.Builder stsCredentialsProviderBuilder = new STSAssumeRoleWithWebIdentitySessionCredentialsProvider.Builder( irsaCredentials.getRoleArn(), irsaCredentials.getRoleSessionName(), irsaCredentials.getIdentityTokenFile() - ).withStsClient(securityTokenService).build() + ).withStsClient(securityTokenService); + + final STSAssumeRoleWithWebIdentitySessionCredentialsProvider stsCredentialsProvider = SocketAccess.doPrivileged( + stsCredentialsProviderBuilder::build ); + + return new PrivilegedSTSAssumeRoleSessionCredentialsProvider<>(securityTokenService, stsCredentialsProvider); } } else if (basicCredentials != null) { logger.debug("Using basic key/secret credentials"); diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java new file mode 100644 index 0000000000000..d92f2af3f4bfd --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java @@ -0,0 +1,279 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.snapshots; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.junit.BeforeClass; +import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.rest.RestStatus; +import org.opensearch.test.BackgroundIndexer; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationSnapshotIT extends AbstractSnapshotIntegTestCase { + private static final String INDEX_NAME = "test-segrep-idx"; + private static final String RESTORED_INDEX_NAME = INDEX_NAME + "-restored"; + private static final int SHARD_COUNT = 1; + private static final int REPLICA_COUNT = 1; + private static final int DOC_COUNT = 1010; + + private static final String REPOSITORY_NAME = "test-segrep-repo"; + private static final String SNAPSHOT_NAME = "test-segrep-snapshot"; + + @BeforeClass + public static void assumeFeatureFlag() { + assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); + } + + public Settings segRepEnableIndexSettings() { + return getShardSettings().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + } + + public Settings docRepEnableIndexSettings() { + return getShardSettings().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + } + + public Settings.Builder getShardSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT); + } + + public Settings restoreIndexSegRepSettings() { + return Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + } + + public Settings restoreIndexDocRepSettings() { + return Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void ingestData(int docCount, String indexName) throws Exception { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + indexName, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(docCount); + waitForDocs(docCount, indexer); + refresh(indexName); + } + } + + // Start cluster with provided settings and return the node names as list + public List startClusterWithSettings(Settings indexSettings, int replicaCount) throws Exception { + // Start primary + final String primaryNode = internalCluster().startNode(); + List nodeNames = new ArrayList<>(); + nodeNames.add(primaryNode); + for (int i = 0; i < replicaCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + createIndex(INDEX_NAME, indexSettings); + ensureGreen(INDEX_NAME); + // Ingest data + ingestData(DOC_COUNT, INDEX_NAME); + return nodeNames; + } + + public void createSnapshot() { + // Snapshot declaration + Path absolutePath = randomRepoPath().toAbsolutePath(); + // Create snapshot + createRepository(REPOSITORY_NAME, "fs", absolutePath); + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME) + .setWaitForCompletion(true) + .setIndices(INDEX_NAME) + .get(); + assertThat( + createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) + ); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + } + + public RestoreSnapshotResponse restoreSnapshotWithSettings(Settings indexSettings) { + RestoreSnapshotRequestBuilder builder = client().admin() + .cluster() + .prepareRestoreSnapshot(REPOSITORY_NAME, SNAPSHOT_NAME) + .setWaitForCompletion(false) + .setRenamePattern(INDEX_NAME) + .setRenameReplacement(RESTORED_INDEX_NAME); + if (indexSettings != null) { + builder.setIndexSettings(indexSettings); + } + return builder.get(); + } + + public void testRestoreOnSegRep() throws Exception { + // Start cluster with one primary and one replica node + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Exception { + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ingestData(5000, RESTORED_INDEX_NAME); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT + 5000); + } + + public void testSnapshotOnDocRep_RestoreOnSegRep() throws Exception { + startClusterWithSettings(docRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexSegRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testSnapshotOnSegRep_RestoreOnDocRep() throws Exception { + // Start a cluster with one primary and one replica + startClusterWithSettings(segRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "DOCUMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testSnapshotOnDocRep_RestoreOnDocRep() throws Exception { + startClusterWithSettings(docRepEnableIndexSettings(), 1); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(restoreIndexDocRepSettings()); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "DOCUMENT"); + + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } + + public void testRestoreOnReplicaNode() throws Exception { + List nodeNames = startClusterWithSettings(segRepEnableIndexSettings(), 1); + final String primaryNode = nodeNames.get(0); + createSnapshot(); + // Delete index + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + assertFalse("index [" + INDEX_NAME + "] should have been deleted", indexExists(INDEX_NAME)); + + // stop the primary node so that restoration happens on replica node + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + + RestoreSnapshotResponse restoreSnapshotResponse = restoreSnapshotWithSettings(null); + + // Assertions + assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); + internalCluster().startNode(); + ensureGreen(RESTORED_INDEX_NAME); + GetSettingsResponse settingsResponse = client().admin() + .indices() + .getSettings(new GetSettingsRequest().indices(RESTORED_INDEX_NAME)) + .get(); + assertEquals(settingsResponse.getSetting(RESTORED_INDEX_NAME, "index.replication.type"), "SEGMENT"); + SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); + assertHitCount(resp, DOC_COUNT); + } +} diff --git a/server/src/main/java/org/opensearch/Version.java b/server/src/main/java/org/opensearch/Version.java index 978f0ee2186f2..9c53a0f449a40 100644 --- a/server/src/main/java/org/opensearch/Version.java +++ b/server/src/main/java/org/opensearch/Version.java @@ -98,6 +98,7 @@ public class Version implements Comparable, ToXContentFragment { public static final Version V_2_2_1 = new Version(2020199, org.apache.lucene.util.Version.LUCENE_9_3_0); public static final Version V_2_2_2 = new Version(2020299, org.apache.lucene.util.Version.LUCENE_9_3_0); public static final Version V_2_3_0 = new Version(2030099, org.apache.lucene.util.Version.LUCENE_9_3_0); + public static final Version V_2_3_1 = new Version(2030199, org.apache.lucene.util.Version.LUCENE_9_3_0); public static final Version V_2_4_0 = new Version(2040099, org.apache.lucene.util.Version.LUCENE_9_3_0); public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_4_0); public static final Version CURRENT = V_3_0_0; diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index f8ba520e465e2..46552bb5d6a03 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.metadata.MetadataMappingService; import org.opensearch.cluster.metadata.MetadataUpdateSettingsService; import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.DelayedAllocationService; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; @@ -191,6 +192,7 @@ public static List getNamedWriteables() { ComposableIndexTemplateMetadata::readDiffFrom ); registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom); + registerMetadataCustom(entries, WeightedRoutingMetadata.TYPE, WeightedRoutingMetadata::new, WeightedRoutingMetadata::readDiffFrom); // Task Status (not Diffable) entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new)); return entries; @@ -274,6 +276,13 @@ public static List getNamedXWriteables() { DataStreamMetadata::fromXContent ) ); + entries.add( + new NamedXContentRegistry.Entry( + Metadata.Custom.class, + new ParseField(WeightedRoutingMetadata.TYPE), + WeightedRoutingMetadata::fromXContent + ) + ); return entries; } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index 5f7e98e9e1199..086865d2170c3 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -810,6 +810,14 @@ public IndexGraveyard indexGraveyard() { return custom(IndexGraveyard.TYPE); } + /** + * * + * @return The weighted routing metadata for search requests + */ + public WeightedRoutingMetadata weightedRoutingMetadata() { + return custom(WeightedRoutingMetadata.TYPE); + } + public T custom(String type) { return (T) customs.get(type); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java new file mode 100644 index 0000000000000..27beb21f28f7c --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -0,0 +1,167 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchParseException; +import org.opensearch.Version; +import org.opensearch.cluster.AbstractNamedDiffable; +import org.opensearch.cluster.NamedDiff; +import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; + +/** + * Contains metadata for weighted routing + * + * @opensearch.internal + */ +public class WeightedRoutingMetadata extends AbstractNamedDiffable implements Metadata.Custom { + private static final Logger logger = LogManager.getLogger(WeightedRoutingMetadata.class); + public static final String TYPE = "weighted_shard_routing"; + public static final String AWARENESS = "awareness"; + private WeightedRouting weightedRouting; + + public WeightedRouting getWeightedRouting() { + return weightedRouting; + } + + public WeightedRoutingMetadata setWeightedRouting(WeightedRouting weightedRouting) { + this.weightedRouting = weightedRouting; + return this; + } + + public WeightedRoutingMetadata(StreamInput in) throws IOException { + if (in.available() != 0) { + this.weightedRouting = new WeightedRouting(in); + } + } + + public WeightedRoutingMetadata(WeightedRouting weightedRouting) { + this.weightedRouting = weightedRouting; + } + + @Override + public EnumSet context() { + return Metadata.API_AND_GATEWAY; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_2_4_0; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + if (weightedRouting != null) { + weightedRouting.writeTo(out); + } + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Metadata.Custom.class, TYPE, in); + } + + public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws IOException { + String attrKey = null; + Double attrValue; + String attributeName = null; + Map weights = new HashMap<>(); + WeightedRouting weightedRouting = null; + XContentParser.Token token; + // move to the first alias + parser.nextToken(); + String awarenessField = null; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + awarenessField = parser.currentName(); + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException( + "failed to parse weighted routing metadata [{}], expected " + "object", + awarenessField + ); + } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + attributeName = parser.currentName(); + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException( + "failed to parse weighted routing metadata [{}], expected" + " object", + attributeName + ); + } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrKey = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + attrValue = Double.parseDouble(parser.text()); + weights.put(attrKey, attrValue); + } else { + throw new OpenSearchParseException( + "failed to parse weighted routing metadata attribute " + "[{}], unknown type", + attributeName + ); + } + } + } + } + } + weightedRouting = new WeightedRouting(attributeName, weights); + return new WeightedRoutingMetadata(weightedRouting); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + WeightedRoutingMetadata that = (WeightedRoutingMetadata) o; + return weightedRouting.equals(that.weightedRouting); + } + + @Override + public int hashCode() { + return weightedRouting.hashCode(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + toXContent(weightedRouting, builder); + return builder; + } + + public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder) throws IOException { + builder.startObject(AWARENESS); + builder.startObject(weightedRouting.attributeName()); + for (Map.Entry entry : weightedRouting.weights().entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); + builder.endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index d4597f47d9a6c..9026e7068e9fe 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -85,6 +85,9 @@ public class IndexShardRoutingTable implements Iterable { private volatile Map activeShardsByAttributes = emptyMap(); private volatile Map initializingShardsByAttributes = emptyMap(); private final Object shardsByAttributeMutex = new Object(); + private final Object shardsByWeightMutex = new Object(); + private volatile Map> activeShardsByWeight = emptyMap(); + private volatile Map> initializingShardsByWeight = emptyMap(); /** * The initializing list, including ones that are initializing on a target node because of relocation. @@ -233,6 +236,10 @@ public List assignedShards() { return this.assignedShards; } + public Map> getActiveShardsByWeight() { + return activeShardsByWeight; + } + public ShardIterator shardsRandomIt() { return new PlainShardIterator(shardId, shuffler.shuffle(shards)); } @@ -292,6 +299,73 @@ public ShardIterator activeInitializingShardsRankedIt( return new PlainShardIterator(shardId, ordered); } + /** + * Returns an iterator over active and initializing shards, shards are ordered by weighted + * round-robin scheduling policy. + * + * @param weightedRouting entity + * @param nodes discovered nodes in the cluster + * @return an iterator over active and initializing shards, ordered by weighted round-robin + * scheduling policy. Making sure that initializing shards are the last to iterate through. + */ + public ShardIterator activeInitializingShardsWeightedIt(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) { + final int seed = shuffler.nextSeed(); + List ordered = new ArrayList<>(); + List orderedActiveShards = getActiveShardsByWeight(weightedRouting, nodes, defaultWeight); + ordered.addAll(shuffler.shuffle(orderedActiveShards, seed)); + if (!allInitializingShards.isEmpty()) { + List orderedInitializingShards = getInitializingShardsByWeight(weightedRouting, nodes, defaultWeight); + ordered.addAll(orderedInitializingShards); + } + return new PlainShardIterator(shardId, ordered); + } + + /** + * Returns a list containing shard routings ordered using weighted round-robin scheduling. + */ + private List shardsOrderedByWeight( + List shards, + WeightedRouting weightedRouting, + DiscoveryNodes nodes, + double defaultWeight + ) { + WeightedRoundRobin weightedRoundRobin = new WeightedRoundRobin<>( + calculateShardWeight(shards, weightedRouting, nodes, defaultWeight) + ); + List> shardsOrderedbyWeight = weightedRoundRobin.orderEntities(); + List orderedShardRouting = new ArrayList<>(activeShards.size()); + if (shardsOrderedbyWeight != null) { + for (WeightedRoundRobin.Entity shardRouting : shardsOrderedbyWeight) { + orderedShardRouting.add(shardRouting.getTarget()); + } + } + return orderedShardRouting; + } + + /** + * Returns a list containing shard routing and associated weight. This function iterates through all the shards and + * uses weighted routing to find weight for the corresponding shard. This is fed to weighted round-robin scheduling + * to order shards by weight. + */ + private List> calculateShardWeight( + List shards, + WeightedRouting weightedRouting, + DiscoveryNodes nodes, + double defaultWeight + ) { + List> shardsWithWeights = new ArrayList<>(); + for (ShardRouting shard : shards) { + DiscoveryNode node = nodes.get(shard.currentNodeId()); + if (node != null) { + String attVal = node.getAttributes().get(weightedRouting.attributeName()); + // If weight for a zone is not defined, considering it as 1 by default + Double weight = weightedRouting.weights().getOrDefault(attVal, defaultWeight); + shardsWithWeights.add(new WeightedRoundRobin.Entity<>(weight, shard)); + } + } + return shardsWithWeights; + } + private static Set getAllNodeIds(final List shards) { final Set nodeIds = new HashSet<>(); for (ShardRouting shard : shards) { @@ -698,6 +772,66 @@ public int shardsMatchingPredicateCount(Predicate predicate) { return count; } + /** + * Key for WeightedRouting Shard Iterator + * + * @opensearch.internal + */ + public static class WeightedRoutingKey { + private final WeightedRouting weightedRouting; + + public WeightedRoutingKey(WeightedRouting weightedRouting) { + this.weightedRouting = weightedRouting; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + WeightedRoutingKey key = (WeightedRoutingKey) o; + if (!weightedRouting.equals(key.weightedRouting)) return false; + return true; + } + + @Override + public int hashCode() { + int result = weightedRouting.hashCode(); + return result; + } + } + + /** + * * + * Gets active shard routing from memory if available, else calculates and put it in memory. + */ + private List getActiveShardsByWeight(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) { + WeightedRoutingKey key = new WeightedRoutingKey(weightedRouting); + List shardRoutings = activeShardsByWeight.get(key); + if (shardRoutings == null) { + synchronized (shardsByWeightMutex) { + shardRoutings = shardsOrderedByWeight(activeShards, weightedRouting, nodes, defaultWeight); + activeShardsByWeight = new MapBuilder().put(key, shardRoutings).immutableMap(); + } + } + return shardRoutings; + } + + /** + * * + * Gets initializing shard routing from memory if available, else calculates and put it in memory. + */ + private List getInitializingShardsByWeight(WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight) { + WeightedRoutingKey key = new WeightedRoutingKey(weightedRouting); + List shardRoutings = initializingShardsByWeight.get(key); + if (shardRoutings == null) { + synchronized (shardsByWeightMutex) { + shardRoutings = shardsOrderedByWeight(activeShards, weightedRouting, nodes, defaultWeight); + initializingShardsByWeight = new MapBuilder().put(key, shardRoutings).immutableMap(); + } + } + return shardRoutings; + } + /** * Builder of an index shard routing table. * diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index 30f6408c19783..9026da667ccb0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -34,6 +34,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.common.Nullable; @@ -75,9 +76,17 @@ public class OperationRouting { Setting.Property.Dynamic, Setting.Property.NodeScope ); + public static final Setting WEIGHTED_ROUTING_DEFAULT_WEIGHT = Setting.doubleSetting( + "cluster.routing.weighted.default_weight", + 1.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); private volatile List awarenessAttributes; private volatile boolean useAdaptiveReplicaSelection; private volatile boolean ignoreAwarenessAttr; + private volatile double weightedRoutingDefaultWeight; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { // whether to ignore awareness attributes when routing requests @@ -88,8 +97,10 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) { this::setAwarenessAttributes ); this.useAdaptiveReplicaSelection = USE_ADAPTIVE_REPLICA_SELECTION_SETTING.get(settings); + this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings); clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes); + clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight); } void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { @@ -100,6 +111,10 @@ void setIgnoreAwarenessAttributes(boolean ignoreAwarenessAttributes) { this.ignoreAwarenessAttr = ignoreAwarenessAttributes; } + void setWeightedRoutingDefaultWeight(double weightedRoutingDefaultWeight) { + this.weightedRoutingDefaultWeight = weightedRoutingDefaultWeight; + } + public boolean isIgnoreAwarenessAttr() { return ignoreAwarenessAttr; } @@ -116,6 +131,10 @@ public boolean ignoreAwarenessAttributes() { return this.awarenessAttributes.isEmpty() || this.ignoreAwarenessAttr; } + public double getWeightedRoutingDefaultWeight() { + return this.weightedRoutingDefaultWeight; + } + public ShardIterator indexShards(ClusterState clusterState, String index, String id, @Nullable String routing) { return shards(clusterState, index, id, routing).shardsIt(); } @@ -133,7 +152,8 @@ public ShardIterator getShards( clusterState.nodes(), preference, null, - null + null, + clusterState.getMetadata().weightedRoutingMetadata() ); } @@ -145,7 +165,8 @@ public ShardIterator getShards(ClusterState clusterState, String index, int shar clusterState.nodes(), preference, null, - null + null, + clusterState.metadata().weightedRoutingMetadata() ); } @@ -175,7 +196,8 @@ public GroupShardsIterator searchShards( clusterState.nodes(), preference, collectorService, - nodeCounts + nodeCounts, + clusterState.metadata().weightedRoutingMetadata() ); if (iterator != null) { set.add(iterator); @@ -225,10 +247,11 @@ private ShardIterator preferenceActiveShardIterator( DiscoveryNodes nodes, @Nullable String preference, @Nullable ResponseCollectorService collectorService, - @Nullable Map nodeCounts + @Nullable Map nodeCounts, + @Nullable WeightedRoutingMetadata weightedRoutingMetadata ) { if (preference == null || preference.isEmpty()) { - return shardRoutings(indexShard, nodes, collectorService, nodeCounts); + return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata); } if (preference.charAt(0) == '_') { Preference preferenceType = Preference.parse(preference); @@ -255,7 +278,7 @@ private ShardIterator preferenceActiveShardIterator( } // no more preference if (index == -1 || index == preference.length() - 1) { - return shardRoutings(indexShard, nodes, collectorService, nodeCounts); + return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata); } else { // update the preference and continue preference = preference.substring(index + 1); @@ -298,9 +321,16 @@ private ShardIterator shardRoutings( IndexShardRoutingTable indexShard, DiscoveryNodes nodes, @Nullable ResponseCollectorService collectorService, - @Nullable Map nodeCounts + @Nullable Map nodeCounts, + @Nullable WeightedRoutingMetadata weightedRoutingMetadata ) { - if (ignoreAwarenessAttributes()) { + if (weightedRoutingMetadata != null) { + return indexShard.activeInitializingShardsWeightedIt( + weightedRoutingMetadata.getWeightedRouting(), + nodes, + getWeightedRoutingDefaultWeight() + ); + } else if (ignoreAwarenessAttributes()) { if (useAdaptiveReplicaSelection) { return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); } else { diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoundRobin.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoundRobin.java new file mode 100644 index 0000000000000..15d437db9c8ff --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoundRobin.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import java.util.ArrayList; +import java.util.List; + +/** + * Weighted Round Robin Scheduling policy + * + */ +public class WeightedRoundRobin { + + private List> entities; + + public WeightedRoundRobin(List> entities) { + this.entities = entities; + } + + /** + * * + * @return list of entities that is ordered using weighted round-robin scheduling + * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling + */ + public List> orderEntities() { + int lastSelectedEntity = -1; + int size = entities.size(); + double currentWeight = 0; + List> orderedWeight = new ArrayList<>(); + if (size == 0) { + return null; + } + // Find maximum weight and greatest common divisor of weight across all entities + double maxWeight = 0; + double sumWeight = 0; + Double gcd = null; + for (WeightedRoundRobin.Entity entity : entities) { + maxWeight = Math.max(maxWeight, entity.getWeight()); + gcd = (gcd == null) ? entity.getWeight() : gcd(gcd, entity.getWeight()); + sumWeight += entity.getWeight() > 0 ? entity.getWeight() : 0; + } + int count = 0; + while (count < sumWeight) { + lastSelectedEntity = (lastSelectedEntity + 1) % size; + if (lastSelectedEntity == 0) { + currentWeight = currentWeight - gcd; + if (currentWeight <= 0) { + currentWeight = maxWeight; + if (currentWeight == 0) { + return orderedWeight; + } + } + } + if (entities.get(lastSelectedEntity).getWeight() >= currentWeight) { + orderedWeight.add(entities.get(lastSelectedEntity)); + count++; + } + } + return orderedWeight; + } + + /** + * Return greatest common divisor for two integers + * https://en.wikipedia.org/wiki/Greatest_common_divisor#Using_Euclid.27s_algorithm + * + * @param a first number + * @param b second number + * @return greatest common divisor + */ + private double gcd(double a, double b) { + return (b == 0) ? a : gcd(b, a % b); + } + + static final class Entity { + private double weight; + private T target; + + public Entity(double weight, T target) { + this.weight = weight; + this.target = target; + } + + public T getTarget() { + return this.target; + } + + public void setTarget(T target) { + this.target = target; + } + + public double getWeight() { + return this.weight; + } + + public void setWeight(double weight) { + this.weight = weight; + } + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java new file mode 100644 index 0000000000000..df2d8d595eaab --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + * Entity for Weighted Round Robin weights + * + * @opensearch.internal + */ +public class WeightedRouting implements Writeable { + private String attributeName; + private Map weights; + + public WeightedRouting(String attributeName, Map weights) { + this.attributeName = attributeName; + this.weights = weights; + } + + public WeightedRouting(WeightedRouting weightedRouting) { + this.attributeName = weightedRouting.attributeName(); + this.weights = weightedRouting.weights; + } + + public WeightedRouting(StreamInput in) throws IOException { + attributeName = in.readString(); + weights = (Map) in.readGenericValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(attributeName); + out.writeGenericValue(weights); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + WeightedRouting that = (WeightedRouting) o; + if (!attributeName.equals(that.attributeName)) return false; + return weights.equals(that.weights); + } + + @Override + public int hashCode() { + return Objects.hash(attributeName, weights); + } + + @Override + public String toString() { + return "WeightedRouting{" + attributeName + "}{" + weights().toString() + "}"; + } + + public Map weights() { + return this.weights; + } + + public String attributeName() { + return this.attributeName; + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 971fb518ff1da..1665614c18496 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -529,6 +529,7 @@ public void apply(Settings value, Settings current, Settings previous) { Node.BREAKER_TYPE_KEY, OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, OperationRouting.IGNORE_AWARENESS_ATTRIBUTES_SETTING, + OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT, IndexGraveyard.SETTING_MAX_TOMBSTONES, PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, diff --git a/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java index 137ca4be1ca87..3acf5d4ea85ee 100644 --- a/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java @@ -34,6 +34,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; + import org.apache.lucene.document.Field; import org.apache.lucene.document.FieldType; import org.apache.lucene.index.IndexOptions; @@ -268,6 +269,8 @@ public void parse(ParseContext context) throws IOException { try { parseCreateField(context); } catch (Exception e) { + boolean ignore_malformed = false; + if (context.indexSettings() != null) ignore_malformed = IGNORE_MALFORMED_SETTING.get(context.indexSettings().getSettings()); String valuePreview = ""; try { XContentParser parser = context.parser(); @@ -278,23 +281,27 @@ public void parse(ParseContext context) throws IOException { valuePreview = complexValue.toString(); } } catch (Exception innerException) { + if (ignore_malformed == false) { + throw new MapperParsingException( + "failed to parse field [{}] of type [{}] in document with id '{}'. " + "Could not parse field value preview,", + e, + fieldType().name(), + fieldType().typeName(), + context.sourceToParse().id() + ); + } + } + + if (ignore_malformed == false) { throw new MapperParsingException( - "failed to parse field [{}] of type [{}] in document with id '{}'. " + "Could not parse field value preview,", + "failed to parse field [{}] of type [{}] in document with id '{}'. " + "Preview of field's value: '{}'", e, fieldType().name(), fieldType().typeName(), - context.sourceToParse().id() + context.sourceToParse().id(), + valuePreview ); } - - throw new MapperParsingException( - "failed to parse field [{}] of type [{}] in document with id '{}'. " + "Preview of field's value: '{}'", - e, - fieldType().name(), - fieldType().typeName(), - context.sourceToParse().id(), - valuePreview - ); } multiFields.parse(this, context); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index dcb7bdeb30e4f..9185ef0d440ce 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -163,8 +163,8 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -203,6 +203,7 @@ import java.util.stream.StreamSupport; import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; +import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; /** @@ -1703,13 +1704,8 @@ public void prepareForIndexRecovery() { * @return a sequence number that an operation-based peer recovery can start with. * This is the first operation after the local checkpoint of the safe commit if exists. */ - public long recoverLocallyUpToGlobalCheckpoint() { - assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; - if (state != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, state); - } - recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); - assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + private long recoverLocallyUpToGlobalCheckpoint() { + validateLocalRecoveryState(); final Optional safeCommit; final long globalCheckpoint; try { @@ -1792,6 +1788,54 @@ public long recoverLocallyUpToGlobalCheckpoint() { } } + public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) { + if (localTranslog) { + return recoverLocallyUpToGlobalCheckpoint(); + } else { + return recoverLocallyUptoLastCommit(); + } + } + + /** + * The method figures out the sequence number basis the last commit. + * + * @return the starting sequence number from which the recovery should start. + */ + private long recoverLocallyUptoLastCommit() { + assert isRemoteTranslogEnabled() : "Remote translog store is not enabled"; + long seqNo; + validateLocalRecoveryState(); + + try { + seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO)); + } catch (org.apache.lucene.index.IndexNotFoundException e) { + logger.error("skip local recovery as no index commit found", e); + return UNASSIGNED_SEQ_NO; + } catch (Exception e) { + logger.error("skip local recovery as failed to find the safe commit", e); + return UNASSIGNED_SEQ_NO; + } + + try { + maybeCheckIndex(); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + recoveryState.getTranslog().totalLocal(0); + } catch (Exception e) { + logger.error("check index failed during fetch seqNo", e); + return UNASSIGNED_SEQ_NO; + } + return seqNo; + } + + private void validateLocalRecoveryState() { + assert Thread.holdsLock(mutex) == false : "recover locally under mutex"; + if (state != IndexShardState.RECOVERING) { + throw new IndexShardNotRecoveringException(shardId, state); + } + recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX); + assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; + } + public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { getEngine().translogManager().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } @@ -1998,7 +2042,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t private boolean assertSequenceNumbersInCommit() throws IOException { final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint"; - assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; + assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid"; assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid [" + userData.get(Engine.HISTORY_UUID_KEY) @@ -3275,6 +3319,10 @@ private boolean isRemoteStoreEnabled() { return (remoteStore != null && shardRouting.primary()); } + public boolean isRemoteTranslogEnabled() { + return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled(); + } + /** * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 85141556657f3..b5702431ed4bf 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -36,10 +36,10 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchTimeoutException; -import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.cluster.ClusterState; @@ -219,6 +219,12 @@ protected void reestablishRecovery(final StartRecoveryRequest request, final Str threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId, request)); } + /** + * Initiates recovery of the replica. TODO - Need to revisit it with PRRL and later. @see + * github issue on it. + * @param recoveryId recovery id + * @param preExistingRequest start recovery request + */ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) { final String actionName; final TransportRequest requestToSend; @@ -238,10 +244,17 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node"; logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); indexShard.prepareForIndexRecovery(); - final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint(); + boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled); assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]"; - startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo); + startRequest = getStartRecoveryRequest( + logger, + clusterService.localNode(), + recoveryTarget, + startingSeqNo, + !remoteTranslogEnabled + ); requestToSend = startRequest; actionName = PeerRecoverySourceService.Actions.START_RECOVERY; } catch (final Exception e) { @@ -270,21 +283,32 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi ); } + public static StartRecoveryRequest getStartRecoveryRequest( + Logger logger, + DiscoveryNode localNode, + RecoveryTarget recoveryTarget, + long startingSeqNo + ) { + return getStartRecoveryRequest(logger, localNode, recoveryTarget, startingSeqNo, true); + } + /** * Prepare the start recovery request. * - * @param logger the logger - * @param localNode the local node of the recovery target - * @param recoveryTarget the target of the recovery - * @param startingSeqNo a sequence number that an operation-based peer recovery can start with. - * This is the first operation after the local checkpoint of the safe commit if exists. + * @param logger the logger + * @param localNode the local node of the recovery target + * @param recoveryTarget the target of the recovery + * @param startingSeqNo a sequence number that an operation-based peer recovery can start with. + * This is the first operation after the local checkpoint of the safe commit if exists. + * @param verifyTranslog should the recovery request validate translog consistency with snapshot store metadata. * @return a start recovery request */ public static StartRecoveryRequest getStartRecoveryRequest( Logger logger, DiscoveryNode localNode, RecoveryTarget recoveryTarget, - long startingSeqNo + long startingSeqNo, + boolean verifyTranslog ) { final StartRecoveryRequest request; logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode()); @@ -292,22 +316,25 @@ public static StartRecoveryRequest getStartRecoveryRequest( Store.MetadataSnapshot metadataSnapshot; try { metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); - // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index. - try { - final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID); - assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; - } catch (IOException | TranslogCorruptedException e) { - logger.warn( - new ParameterizedMessage( - "error while reading global checkpoint from translog, " - + "resetting the starting sequence number from {} to unassigned and recovering as if there are none", - startingSeqNo - ), - e - ); - metadataSnapshot = Store.MetadataSnapshot.EMPTY; - startingSeqNo = UNASSIGNED_SEQ_NO; + if (verifyTranslog) { + // Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene + // index. + try { + final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY); + final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID); + assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint; + } catch (IOException | TranslogCorruptedException e) { + logger.warn( + new ParameterizedMessage( + "error while reading global checkpoint from translog, " + + "resetting the starting sequence number from {} to unassigned and recovering as if there are none", + startingSeqNo + ), + e + ); + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + startingSeqNo = UNASSIGNED_SEQ_NO; + } } } catch (final org.apache.lucene.index.IndexNotFoundException e) { // happens on an empty folder. no need to log diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 9e219db5a4c96..665e79722770e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -316,60 +316,85 @@ && isTargetSameHistory() } assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo; - sendFileStep.whenComplete(r -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); - // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); - }, onFailure); - - prepareEngineStep.whenComplete(prepareEngineTime -> { - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); - /* - * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. - * This means that any document indexed into the primary after this will be replicated to this replica as well - * make sure to do this before sampling the max sequence number in the next step, to ensure that we send - * all documents up to maxSeqNo in phase2. - */ - RunUnderPrimaryPermit.run( - () -> shard.initiateTracking(request.targetAllocationId()), - shardId + " initiating tracking of " + request.targetAllocationId(), - shard, - cancellableThreads, - logger - ); - - final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - if (logger.isTraceEnabled()) { - logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo)); - } - final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot( - PEER_RECOVERY_NAME, - startingSeqNo, - Long.MAX_VALUE, - false, - true - ); - resources.add(phase2Snapshot); - retentionLock.close(); + boolean isRecoveringReplicaWithRemoteTxLogEnabledIndex = request.isPrimaryRelocation() == false + && shard.isRemoteTranslogEnabled(); + + if (isRecoveringReplicaWithRemoteTxLogEnabledIndex) { + sendFileStep.whenComplete(r -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(0, prepareEngineStep); + }, onFailure); + + prepareEngineStep.whenComplete(prepareEngineTime -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); + RunUnderPrimaryPermit.run( + () -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId(), + shard, + cancellableThreads, + logger + ); + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + retentionLock.close(); + sendSnapshotStep.onResponse(new SendSnapshotResult(endingSeqNo, 0, TimeValue.ZERO)); + }, onFailure); + } else { + sendFileStep.whenComplete(r -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); + // For a sequence based recovery, the target can keep its local translog + prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); + }, onFailure); + + prepareEngineStep.whenComplete(prepareEngineTime -> { + assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); + /* + * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. + * This means that any document indexed into the primary after this will be replicated to this replica as well + * make sure to do this before sampling the max sequence number in the next step, to ensure that we send + * all documents up to maxSeqNo in phase2. + */ + RunUnderPrimaryPermit.run( + () -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId(), + shard, + cancellableThreads, + logger + ); - // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values - // are at least as high as the corresponding values on the primary when any of these operations were executed on it. - final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); - final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); - final RetentionLeases retentionLeases = shard.getRetentionLeases(); - final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion(); - phase2( - startingSeqNo, - endingSeqNo, - phase2Snapshot, - maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, - retentionLeases, - mappingVersionOnPrimary, - sendSnapshotStep - ); + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); + if (logger.isTraceEnabled()) { + logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo)); + } + final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot( + PEER_RECOVERY_NAME, + startingSeqNo, + Long.MAX_VALUE, + false, + true + ); + resources.add(phase2Snapshot); + retentionLock.close(); + + // we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values + // are at least as high as the corresponding values on the primary when any of these operations were executed on it. + final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); + final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); + final RetentionLeases retentionLeases = shard.getRetentionLeases(); + final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion(); + phase2( + startingSeqNo, + endingSeqNo, + phase2Snapshot, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + mappingVersionOnPrimary, + sendSnapshotStep + ); - }, onFailure); + }, onFailure); + } // Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2 final long trimAboveSeqNo = startingSeqNo - 1; diff --git a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java new file mode 100644 index 0000000000000..a0a9d2bd9586b --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.Map; + +public class WeightedRoutingMetadataTests extends AbstractXContentTestCase { + @Override + protected WeightedRoutingMetadata createTestInstance() { + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + return weightedRoutingMetadata; + } + + @Override + protected WeightedRoutingMetadata doParseInstance(XContentParser parser) throws IOException { + return WeightedRoutingMetadata.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index 8bf2b1626292a..87cab4a006a63 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -36,6 +36,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; @@ -759,6 +760,232 @@ public void testAdaptiveReplicaSelectionWithZoneAwarenessIgnored() throws Except terminate(threadPool); } + private ClusterState clusterStateForWeightedRouting(String[] indexNames, int numShards, int numReplicas) { + DiscoveryNode[] allNodes = setUpNodesForWeightedRouting(); + ClusterState state = ClusterStateCreationUtils.state(allNodes[0], allNodes[6], allNodes); + + Map> discoveryNodeMap = new HashMap<>(); + List nodesZoneA = new ArrayList<>(); + nodesZoneA.add(allNodes[0]); + nodesZoneA.add(allNodes[1]); + + List nodesZoneB = new ArrayList<>(); + nodesZoneB.add(allNodes[2]); + nodesZoneB.add(allNodes[3]); + + List nodesZoneC = new ArrayList<>(); + nodesZoneC.add(allNodes[4]); + nodesZoneC.add(allNodes[5]); + discoveryNodeMap.put("a", nodesZoneA); + discoveryNodeMap.put("b", nodesZoneB); + discoveryNodeMap.put("c", nodesZoneC); + + // Updating cluster state with node, index and shard details + state = updateStatetoTestWeightedRouting(indexNames, numShards, numReplicas, state, discoveryNodeMap); + + return state; + + } + + private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); + return clusterState; + } + + public void testWeightedOperationRouting() throws Exception { + final int numIndices = 2; + final int numShards = 3; + final int numReplicas = 2; + // setting up indices + final String[] indexNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indexNames[i] = "test" + i; + } + ClusterService clusterService = null; + TestThreadPool threadPool = null; + try { + ClusterState state = clusterStateForWeightedRouting(indexNames, numShards, numReplicas); + + Settings setting = Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone").build(); + + threadPool = new TestThreadPool("testThatOnlyNodesSupport"); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + + OperationRouting opRouting = new OperationRouting( + setting, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + assertTrue(opRouting.ignoreAwarenessAttributes()); + Set selectedNodes = new HashSet<>(); + ResponseCollectorService collector = new ResponseCollectorService(clusterService); + Map outstandingRequests = new HashMap<>(); + + // Setting up weights for weighted round-robin in cluster state + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + state = setWeightedRoutingWeights(state, weights); + + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + // search shards call + GroupShardsIterator groupIterator = opRouting.searchShards( + state, + indexNames, + null, + null, + collector, + outstandingRequests + + ); + + for (ShardIterator it : groupIterator) { + List shardRoutings = Collections.singletonList(it.nextOrNull()); + for (ShardRouting shardRouting : shardRoutings) { + selectedNodes.add(shardRouting.currentNodeId()); + } + } + // tests no shards are assigned to nodes in zone c + for (String nodeID : selectedNodes) { + // No shards are assigned to nodes in zone c since its weight is 0 + assertFalse(nodeID.contains("c")); + } + + selectedNodes = new HashSet<>(); + setting = Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone").build(); + + // Updating weighted round robin weights in cluster state + weights = Map.of("a", 1.0, "b", 0.0, "c", 1.0); + state = setWeightedRoutingWeights(state, weights); + + builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + opRouting = new OperationRouting(setting, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + + // search shards call + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + + for (ShardIterator it : groupIterator) { + List shardRoutings = Collections.singletonList(it.nextOrNull()); + for (ShardRouting shardRouting : shardRoutings) { + selectedNodes.add(shardRouting.currentNodeId()); + } + } + // tests that no shards are assigned to zone with weight zero + for (String nodeID : selectedNodes) { + // No shards are assigned to nodes in zone b since its weight is 0 + assertFalse(nodeID.contains("b")); + } + } finally { + IOUtils.close(clusterService); + terminate(threadPool); + } + } + + public void testWeightedOperationRoutingWeightUndefinedForOneZone() throws Exception { + final int numIndices = 2; + final int numShards = 3; + final int numReplicas = 2; + // setting up indices + final String[] indexNames = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + indexNames[i] = "test" + i; + } + + ClusterService clusterService = null; + TestThreadPool threadPool = null; + try { + ClusterState state = clusterStateForWeightedRouting(indexNames, numShards, numReplicas); + + Settings setting = Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone").build(); + + threadPool = new TestThreadPool("testThatOnlyNodesSupport"); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + + OperationRouting opRouting = new OperationRouting( + setting, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + assertTrue(opRouting.ignoreAwarenessAttributes()); + Set selectedNodes = new HashSet<>(); + ResponseCollectorService collector = new ResponseCollectorService(clusterService); + Map outstandingRequests = new HashMap<>(); + + // Setting up weights for weighted round-robin in cluster state, weight for nodes in zone b is not set + Map weights = Map.of("a", 1.0, "c", 0.0); + state = setWeightedRoutingWeights(state, weights); + ClusterServiceUtils.setState(clusterService, ClusterState.builder(state)); + + // search shards call + GroupShardsIterator groupIterator = opRouting.searchShards( + state, + indexNames, + null, + null, + collector, + outstandingRequests + + ); + + for (ShardIterator it : groupIterator) { + List shardRoutings = Collections.singletonList(it.nextOrNull()); + for (ShardRouting shardRouting : shardRoutings) { + selectedNodes.add(shardRouting.currentNodeId()); + } + } + boolean weighAwayNodesInUndefinedZone = true; + // tests no shards are assigned to nodes in zone c + // tests shards are assigned to nodes in zone b + for (String nodeID : selectedNodes) { + // shard from nodes in zone c is not selected since its weight is 0 + assertFalse(nodeID.contains("c")); + if (nodeID.contains("b")) { + weighAwayNodesInUndefinedZone = false; + } + } + assertFalse(weighAwayNodesInUndefinedZone); + + selectedNodes = new HashSet<>(); + setting = Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone").build(); + + // Updating weighted round robin weights in cluster state + weights = Map.of("a", 0.0, "b", 1.0); + + state = setWeightedRoutingWeights(state, weights); + ClusterServiceUtils.setState(clusterService, ClusterState.builder(state)); + + opRouting = new OperationRouting(setting, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + + // search shards call + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + + for (ShardIterator it : groupIterator) { + List shardRoutings = Collections.singletonList(it.nextOrNull()); + for (ShardRouting shardRouting : shardRoutings) { + selectedNodes.add(shardRouting.currentNodeId()); + } + } + // tests that no shards are assigned to zone with weight zero + // tests shards are assigned to nodes in zone c + weighAwayNodesInUndefinedZone = true; + for (String nodeID : selectedNodes) { + // shard from nodes in zone a is not selected since its weight is 0 + assertFalse(nodeID.contains("a")); + if (nodeID.contains("c")) { + weighAwayNodesInUndefinedZone = false; + } + } + assertFalse(weighAwayNodesInUndefinedZone); + } finally { + IOUtils.close(clusterService); + terminate(threadPool); + } + } + private DiscoveryNode[] setupNodes() { // Sets up two data nodes in zone-a and one data node in zone-b List zones = Arrays.asList("a", "a", "b"); @@ -785,6 +1012,32 @@ private DiscoveryNode[] setupNodes() { return allNodes; } + private DiscoveryNode[] setUpNodesForWeightedRouting() { + List zones = Arrays.asList("a", "a", "b", "b", "c", "c"); + DiscoveryNode[] allNodes = new DiscoveryNode[7]; + int i = 0; + for (String zone : zones) { + DiscoveryNode node = new DiscoveryNode( + "node_" + zone + "_" + i, + buildNewFakeTransportAddress(), + singletonMap("zone", zone), + Collections.singleton(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + allNodes[i++] = node; + } + + DiscoveryNode clusterManager = new DiscoveryNode( + "cluster-manager", + buildNewFakeTransportAddress(), + Collections.emptyMap(), + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.CURRENT + ); + allNodes[i] = clusterManager; + return allNodes; + } + public void testAllocationAwarenessDeprecation() { OperationRouting routing = new OperationRouting( Settings.builder() @@ -841,4 +1094,69 @@ private ClusterState updateStatetoTestARS( clusterState.routingTable(routingTableBuilder.build()); return clusterState.build(); } + + private ClusterState updateStatetoTestWeightedRouting( + String[] indices, + int numberOfShards, + int numberOfReplicas, + ClusterState state, + Map> discoveryNodeMap + ) { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Metadata.Builder metadataBuilder = Metadata.builder(); + ClusterState.Builder clusterState = ClusterState.builder(state); + List nodesZoneA = discoveryNodeMap.get("a"); + List nodesZoneB = discoveryNodeMap.get("b"); + List nodesZoneC = discoveryNodeMap.get("c"); + for (String index : indices) { + IndexMetadata indexMetadata = IndexMetadata.builder(index) + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + for (int i = 0; i < numberOfShards; i++) { + final ShardId shardId = new ShardId(index, "_na_", i); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + // Assign all the primary shards on nodes in zone-a (node_a0 or node_a1) + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting( + index, + i, + nodesZoneA.get(randomInt(nodesZoneA.size() - 1)).getId(), + null, + true, + ShardRoutingState.STARTED + ) + ); + for (int replica = 0; replica < numberOfReplicas; replica++) { + // Assign all the replicas on nodes in zone-b (node_b2) + String nodeId = ""; + if (replica == 0) { + nodeId = nodesZoneB.get(randomInt(nodesZoneB.size() - 1)).getId(); + } else { + nodeId = nodesZoneC.get(randomInt(nodesZoneC.size() - 1)).getId(); + } + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(index, i, nodeId, null, false, ShardRoutingState.STARTED) + ); + } + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + } + routingTableBuilder.add(indexRoutingTableBuilder.build()); + } + // add weighted routing weights in metadata + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + clusterState.metadata(metadataBuilder); + clusterState.routingTable(routingTableBuilder.build()); + return clusterState.build(); + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoundRobinTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoundRobinTests.java new file mode 100644 index 0000000000000..5f62d30486e86 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoundRobinTests.java @@ -0,0 +1,151 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class WeightedRoundRobinTests extends OpenSearchTestCase { + + public void testWeightedRoundRobinOrder() { + // weights set as A:4, B:3, C:2 + List> entity = new ArrayList>(); + entity.add(new WeightedRoundRobin.Entity<>(4, "A")); + entity.add(new WeightedRoundRobin.Entity<>(3, "B")); + entity.add(new WeightedRoundRobin.Entity<>(2, "C")); + WeightedRoundRobin weightedRoundRobin = new WeightedRoundRobin(entity); + List> orderedEntities = weightedRoundRobin.orderEntities(); + List expectedOrdering = Arrays.asList("A", "A", "B", "A", "B", "C", "A", "B", "C"); + List actualOrdering = new ArrayList<>(); + for (WeightedRoundRobin.Entity en : orderedEntities) { + actualOrdering.add(en.getTarget()); + } + assertEquals(expectedOrdering, actualOrdering); + + // weights set as A:1, B:1, C:0 + entity = new ArrayList>(); + entity.add(new WeightedRoundRobin.Entity<>(1, "A")); + entity.add(new WeightedRoundRobin.Entity<>(1, "B")); + entity.add(new WeightedRoundRobin.Entity<>(0, "C")); + weightedRoundRobin = new WeightedRoundRobin(entity); + orderedEntities = weightedRoundRobin.orderEntities(); + expectedOrdering = Arrays.asList("A", "B"); + actualOrdering = new ArrayList<>(); + for (WeightedRoundRobin.Entity en : orderedEntities) { + actualOrdering.add(en.getTarget()); + } + assertEquals(expectedOrdering, actualOrdering); + + // weights set as A:0, B:0, C:0 + entity = new ArrayList>(); + entity.add(new WeightedRoundRobin.Entity<>(0, "A")); + entity.add(new WeightedRoundRobin.Entity<>(0, "B")); + entity.add(new WeightedRoundRobin.Entity<>(0, "C")); + weightedRoundRobin = new WeightedRoundRobin(entity); + orderedEntities = weightedRoundRobin.orderEntities(); + expectedOrdering = Arrays.asList(); + actualOrdering = new ArrayList<>(); + for (WeightedRoundRobin.Entity en : orderedEntities) { + actualOrdering.add(en.getTarget()); + } + assertEquals(expectedOrdering, actualOrdering); + + // weights set as A:-1, B:0, C:1 + entity = new ArrayList>(); + entity.add(new WeightedRoundRobin.Entity<>(-1, "A")); + entity.add(new WeightedRoundRobin.Entity<>(0, "B")); + entity.add(new WeightedRoundRobin.Entity<>(1, "C")); + weightedRoundRobin = new WeightedRoundRobin(entity); + orderedEntities = weightedRoundRobin.orderEntities(); + expectedOrdering = Arrays.asList("C"); + actualOrdering = new ArrayList<>(); + for (WeightedRoundRobin.Entity en : orderedEntities) { + actualOrdering.add(en.getTarget()); + } + assertEquals(expectedOrdering, actualOrdering); + + // weights set as A:-1, B:3, C:0, D:10 + entity = new ArrayList>(); + entity.add(new WeightedRoundRobin.Entity<>(-1, "A")); + entity.add(new WeightedRoundRobin.Entity<>(3, "B")); + entity.add(new WeightedRoundRobin.Entity<>(0, "C")); + entity.add(new WeightedRoundRobin.Entity<>(10, "D")); + weightedRoundRobin = new WeightedRoundRobin(entity); + orderedEntities = weightedRoundRobin.orderEntities(); + expectedOrdering = Arrays.asList("B", "D", "B", "D", "B", "D", "D", "D", "D", "D", "D", "D", "D"); + actualOrdering = new ArrayList<>(); + for (WeightedRoundRobin.Entity en : orderedEntities) { + actualOrdering.add(en.getTarget()); + } + assertEquals(expectedOrdering, actualOrdering); + + // weights set as A:-1, B:3, C:0, D:10000 + entity = new ArrayList>(); + entity.add(new WeightedRoundRobin.Entity<>(-1, "A")); + entity.add(new WeightedRoundRobin.Entity<>(3, "B")); + entity.add(new WeightedRoundRobin.Entity<>(0, "C")); + entity.add(new WeightedRoundRobin.Entity<>(10000, "D")); + weightedRoundRobin = new WeightedRoundRobin(entity); + orderedEntities = weightedRoundRobin.orderEntities(); + assertEquals(10003, orderedEntities.size()); + // Count of D's + int countD = 0; + // Count of B's + int countB = 0; + for (WeightedRoundRobin.Entity en : orderedEntities) { + if (en.getTarget().equals("D")) { + countD++; + } else if (en.getTarget().equals("B")) { + countB++; + } + } + assertEquals(3, countB); + assertEquals(10000, countD); + + // weights set C:0 + entity = new ArrayList>(); + entity.add(new WeightedRoundRobin.Entity<>(0, "C")); + weightedRoundRobin = new WeightedRoundRobin(entity); + orderedEntities = weightedRoundRobin.orderEntities(); + expectedOrdering = Arrays.asList(); + actualOrdering = new ArrayList<>(); + for (WeightedRoundRobin.Entity en : orderedEntities) { + actualOrdering.add(en.getTarget()); + } + assertEquals(expectedOrdering, actualOrdering); + + // weights set C:1 + entity = new ArrayList>(); + entity.add(new WeightedRoundRobin.Entity<>(1, "C")); + weightedRoundRobin = new WeightedRoundRobin(entity); + orderedEntities = weightedRoundRobin.orderEntities(); + expectedOrdering = Arrays.asList("C"); + actualOrdering = new ArrayList<>(); + for (WeightedRoundRobin.Entity en : orderedEntities) { + actualOrdering.add(en.getTarget()); + } + assertEquals(expectedOrdering, actualOrdering); + + // weights set C:2 + entity = new ArrayList>(); + entity.add(new WeightedRoundRobin.Entity<>(2, "C")); + weightedRoundRobin = new WeightedRoundRobin(entity); + orderedEntities = weightedRoundRobin.orderEntities(); + expectedOrdering = Arrays.asList("C", "C"); + actualOrdering = new ArrayList<>(); + for (WeightedRoundRobin.Entity en : orderedEntities) { + actualOrdering.add(en.getTarget()); + } + assertEquals(expectedOrdering, actualOrdering); + } + +} diff --git a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java index 68ad47fa1bbc9..8f5aa1b764551 100644 --- a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java @@ -40,6 +40,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.GroupShardsIterator; +import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.OperationRouting; import org.opensearch.cluster.routing.PlainShardIterator; import org.opensearch.cluster.routing.RotationShardShuffler; @@ -48,11 +49,15 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardShuffler; import org.opensearch.cluster.routing.ShardsIterator; +import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.ShardId; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.threadpool.TestThreadPool; import java.util.Arrays; import java.util.Collections; @@ -497,4 +502,209 @@ public void testReplicaShardPreferenceIters() throws Exception { } } + public void testWeightedRoutingWithDifferentWeights() { + TestThreadPool threadPool = null; + try { + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.awareness.attributes", "zone"); + AllocationService strategy = createAllocationService(settings.build()); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + threadPool = new TestThreadPool("testThatOnlyNodesSupport"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + + Map node1Attributes = new HashMap<>(); + node1Attributes.put("zone", "zone1"); + Map node2Attributes = new HashMap<>(); + node2Attributes.put("zone", "zone2"); + Map node3Attributes = new HashMap<>(); + node3Attributes.put("zone", "zone3"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", unmodifiableMap(node1Attributes))) + .add(newNode("node2", unmodifiableMap(node2Attributes))) + .add(newNode("node3", unmodifiableMap(node3Attributes))) + .localNodeId("node1") + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + Map weights = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ShardIterator shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + + assertEquals(2, shardIterator.size()); + ShardRouting shardRouting; + while (shardIterator.remaining() > 0) { + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); + } + + weights = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 1.0); + weightedRouting = new WeightedRouting("zone", weights); + shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + assertEquals(3, shardIterator.size()); + + weights = Map.of("zone1", -1.0, "zone2", 0.0, "zone3", 1.0); + weightedRouting = new WeightedRouting("zone", weights); + shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + assertEquals(1, shardIterator.size()); + while (shardIterator.remaining() > 0) { + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node2", "node1").contains(shardRouting.currentNodeId())); + } + + weights = Map.of("zone1", 0.0, "zone2", 0.0, "zone3", 0.0); + weightedRouting = new WeightedRouting("zone", weights); + shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + assertEquals(0, shardIterator.size()); + } finally { + terminate(threadPool); + } + } + + public void testWeightedRoutingInMemoryStore() { + TestThreadPool threadPool = null; + try { + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.awareness.attributes", "zone"); + AllocationService strategy = createAllocationService(settings.build()); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(routingTable) + .build(); + + threadPool = new TestThreadPool("testThatOnlyNodesSupport"); + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + + Map node1Attributes = new HashMap<>(); + node1Attributes.put("zone", "zone1"); + Map node2Attributes = new HashMap<>(); + node2Attributes.put("zone", "zone2"); + Map node3Attributes = new HashMap<>(); + node3Attributes.put("zone", "zone3"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", unmodifiableMap(node1Attributes))) + .add(newNode("node2", unmodifiableMap(node2Attributes))) + .add(newNode("node3", unmodifiableMap(node3Attributes))) + .localNodeId("node1") + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + Map weights = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + IndexShardRoutingTable indexShardRoutingTable = clusterState.routingTable().index("test").shard(0); + + assertNull( + indexShardRoutingTable.getActiveShardsByWeight().get(new IndexShardRoutingTable.WeightedRoutingKey(weightedRouting)) + ); + ShardIterator shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + assertEquals(2, shardIterator.size()); + ShardRouting shardRouting; + while (shardIterator.remaining() > 0) { + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); + } + + // Make iterator call with same WeightedRouting instance + assertNotNull( + indexShardRoutingTable.getActiveShardsByWeight().get(new IndexShardRoutingTable.WeightedRoutingKey(weightedRouting)) + ); + shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + assertEquals(2, shardIterator.size()); + while (shardIterator.remaining() > 0) { + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); + } + + // Make iterator call with new instance of WeightedRouting but same weights + Map weights1 = Map.of("zone1", 1.0, "zone2", 1.0, "zone3", 0.0); + weightedRouting = new WeightedRouting("zone", weights1); + assertNotNull( + indexShardRoutingTable.getActiveShardsByWeight().get(new IndexShardRoutingTable.WeightedRoutingKey(weightedRouting)) + ); + shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + assertEquals(2, shardIterator.size()); + while (shardIterator.remaining() > 0) { + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node3").contains(shardRouting.currentNodeId())); + } + + // Make iterator call with different weights + Map weights2 = Map.of("zone1", 1.0, "zone2", 0.0, "zone3", 1.0); + weightedRouting = new WeightedRouting("zone", weights2); + assertNull( + indexShardRoutingTable.getActiveShardsByWeight().get(new IndexShardRoutingTable.WeightedRoutingKey(weightedRouting)) + ); + shardIterator = clusterState.routingTable() + .index("test") + .shard(0) + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1); + assertEquals(2, shardIterator.size()); + while (shardIterator.remaining() > 0) { + shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertFalse(Arrays.asList("node2").contains(shardRouting.currentNodeId())); + } + } finally { + terminate(threadPool); + } + } } diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java new file mode 100644 index 0000000000000..5d317693e02df --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -0,0 +1,172 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.junit.Assert; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.DocIdSeqNoAndSource; +import org.opensearch.index.engine.NRTReplicationEngine; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.translog.WriteOnlyTranslogManager; +import org.opensearch.indices.recovery.RecoveryTarget; +import org.opensearch.indices.replication.common.ReplicationType; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; + +public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchIndexLevelReplicationTestCase { + + private static final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") + .build(); + + public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + + // Step1 - Start primary, index docs and flush + shards.startPrimary(); + final IndexShard primary = shards.getPrimary(); + int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + shards.flush(); + + // Step 2 - Start replica for recovery to happen, check both has same number of docs + final IndexShard replica1 = shards.addReplica(); + shards.startAll(); + assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); + + // Step 3 - Index more docs, run segment replication, check both have same number of docs + int moreDocs = shards.indexDocs(randomIntBetween(10, 100)); + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + assertEquals(getDocIdAndSeqNos(primary), getDocIdAndSeqNos(replica1)); + + // Step 4 - Check both shard has expected number of doc count + assertDocCount(primary, numDocs + moreDocs); + assertDocCount(replica1, numDocs + moreDocs); + + // Step 5 - Start new replica, recovery happens, and check that new replica has docs upto last flush + final IndexShard replica2 = shards.addReplica(); + shards.startAll(); + assertDocCount(replica2, numDocs); + + // Step 6 - Segment replication, check all shards have same number of docs + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs + moreDocs); + } + } + + public void testNoTranslogHistoryTransferred() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + + // Step1 - Start primary, index docs, flush, index more docs, check translog in primary as expected + shards.startPrimary(); + final IndexShard primary = shards.getPrimary(); + int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + shards.flush(); + List docIdAndSeqNosAfterFlush = getDocIdAndSeqNos(primary); + int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); + assertEquals(moreDocs, getTranslog(primary).totalOperations()); + + // Step 2 - Start replica, recovery happens, check docs recovered till last flush + final IndexShard replica = shards.addReplica(); + shards.startAll(); + assertEquals(docIdAndSeqNosAfterFlush, getDocIdAndSeqNos(replica)); + assertDocCount(replica, numDocs); + assertEquals(NRTReplicationEngine.class, replica.getEngine().getClass()); + + // Step 3 - Check replica's translog has no operations + assertEquals(WriteOnlyTranslogManager.class, replica.getEngine().translogManager().getClass()); + WriteOnlyTranslogManager replicaTranslogManager = (WriteOnlyTranslogManager) replica.getEngine().translogManager(); + assertEquals(0, replicaTranslogManager.getTranslog().totalOperations()); + + // Adding this for close to succeed + shards.flush(); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs + moreDocs); + } + } + + public void testStartSequenceForReplicaRecovery() throws Exception { + try (ReplicationGroup shards = createGroup(0, settings, new NRTReplicationEngineFactory())) { + + shards.startPrimary(); + final IndexShard primary = shards.getPrimary(); + int numDocs = shards.indexDocs(randomIntBetween(10, 100)); + shards.flush(); + + final IndexShard replica = shards.addReplica(); + shards.startAll(); + + allowShardFailures(); + replica.failShard("test", null); + + final ShardRouting replicaRouting = replica.routingEntry(); + final IndexMetadata newIndexMetadata = IndexMetadata.builder(replica.indexSettings().getIndexMetadata()) + .primaryTerm(replicaRouting.shardId().id(), replica.getOperationPrimaryTerm() + 1) + .build(); + closeShards(replica); + shards.removeReplica(replica); + + int moreDocs = shards.indexDocs(randomIntBetween(20, 100)); + shards.flush(); + + IndexShard newReplicaShard = newShard( + newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + false, + ShardRoutingState.INITIALIZING, + RecoverySource.PeerRecoverySource.INSTANCE + ), + replica.shardPath(), + newIndexMetadata, + null, + null, + replica.getEngineFactory(), + replica.getEngineConfigFactory(), + replica.getGlobalCheckpointSyncer(), + replica.getRetentionLeaseSyncer(), + EMPTY_EVENT_LISTENER, + null + ); + shards.addReplica(newReplicaShard); + shards.recoverReplica(newReplicaShard, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener) { + @Override + public IndexShard indexShard() { + IndexShard idxShard = super.indexShard(); + // verify the starting sequence number while recovering a failed shard which has a valid last commit + long startingSeqNo = -1; + try { + startingSeqNo = Long.parseLong( + idxShard.store().readLastCommittedSegmentsInfo().getUserData().get(SequenceNumbers.MAX_SEQ_NO) + ); + } catch (IOException e) { + Assert.fail(); + } + assertEquals(numDocs - 1, startingSeqNo); + return idxShard; + } + }); + + shards.flush(); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs + moreDocs); + } + } +} diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 2a88345346e52..a50089831b3e9 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -211,7 +211,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { IndexShard shard = newShard(false); shard.markAsRecovering("for testing", new RecoveryState(shard.routingEntry(), localNode, localNode)); shard.prepareForIndexRecovery(); - assertThat(shard.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(shard.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO)); assertThat(shard.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); assertThat(shard.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); assertThat(shard.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -239,7 +239,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { ); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(globalCheckpoint + 1)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(globalCheckpoint + 1)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(expectedTotalLocal)); assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(expectedTotalLocal)); assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -254,7 +254,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -276,10 +276,10 @@ public void testPrepareIndexForPeerRecovery() throws Exception { replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); if (safeCommit.isPresent()) { - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(safeCommit.get().localCheckpoint + 1)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0)); } else { - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(UNASSIGNED_SEQ_NO)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN)); } assertThat(replica.recoveryState().getStage(), equalTo(RecoveryState.Stage.TRANSLOG)); @@ -322,7 +322,7 @@ public void testClosedIndexSkipsLocalRecovery() throws Exception { ); replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); replica.prepareForIndexRecovery(); - assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1)); + assertThat(replica.recoverLocallyAndFetchStartSeqNo(true), equalTo(safeCommit.get().localCheckpoint + 1)); assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0)); assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -349,7 +349,7 @@ public void testResetStartingSeqNoIfLastCommitCorrupted() throws Exception { shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode)); shard.prepareForIndexRecovery(); - long startingSeqNo = shard.recoverLocallyUpToGlobalCheckpoint(); + long startingSeqNo = shard.recoverLocallyAndFetchStartSeqNo(true); shard.store().markStoreCorrupted(new IOException("simulated")); RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null); StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(logger, rNode, recoveryTarget, startingSeqNo); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 09eca006d600a..1fcdfd79c544e 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -134,7 +134,6 @@ import java.io.IOException; import java.util.ArrayList; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -853,7 +852,9 @@ protected final void recoverUnstartedReplica( } replica.prepareForIndexRecovery(); final RecoveryTarget recoveryTarget = targetSupplier.apply(replica, pNode); - final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint(); + IndexShard indexShard = recoveryTarget.indexShard(); + boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); + final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled); final StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest( logger, rNode,