Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start replication checkpointTimers on primary before segments upload to remote store. #8221

Merged
merged 4 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020))
- Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038))
- Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029)
- Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221)
- Add distributed tracing framework ([#7543](https://github.com/opensearch-project/OpenSearch/issues/7543))
- Enable Point based optimization for custom comparators ([#8168](https://github.com/opensearch-project/OpenSearch/pull/8168))
- [Extensions] Support extension additional settings with extension REST initialization ([#8414](https://github.com/opensearch-project/OpenSearch/pull/8414))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3661,6 +3661,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}

if (isRemoteStoreEnabled()) {
internalRefreshListener.add(
new RemoteStoreRefreshListener(
Expand All @@ -3672,9 +3676,6 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
);
}

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
* change to a writeable engine during relocation handoff after a round of segment replication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ private synchronized void syncSegments(boolean isRetry) {
if (indexShard.getReplicationTracker().isPrimaryMode() == false) {
return;
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
indexShard.onCheckpointPublished(checkpoint);
beforeSegmentsSync(isRetry);
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
Expand All @@ -209,7 +211,6 @@ private synchronized void syncSegments(boolean isRetry) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2692,6 +2692,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc
public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throws IOException {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
Expand Down Expand Up @@ -4815,6 +4816,7 @@ public void testTranslogFactoryForReplicaShardWithoutRemoteStore() throws IOExce
public void testTranslogFactoryForRemoteTranslogBackedPrimaryShard() throws IOException {
Settings primarySettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,17 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
public void setup(boolean primary, int numberOfDocs) throws IOException {
indexShard = newStartedShard(
primary,
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(),
Settings.builder()
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build(),
new InternalEngineFactory()
);

indexDocs(1, numberOfDocs);
indexShard.refresh("test");
if (primary) {
indexDocs(1, numberOfDocs);
indexShard.refresh("test");
}

clusterService = new ClusterService(
Settings.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -44,9 +45,13 @@ public class RemoteStoreReplicationSourceTests extends OpenSearchIndexLevelRepli
@Override
public void setUp() throws Exception {
super.setUp();

indexShard = newStartedShard(
true,
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(),
Settings.builder()
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build(),
new InternalEngineFactory()
);

Expand Down
Loading