Skip to content

Commit

Permalink
Start replication checkpointTimers on primary before segments upload …
Browse files Browse the repository at this point in the history
…to remote store. (opensearch-project#8221)

* Start replication timer before segments upload.

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Addressed comments

Signed-off-by: Ankit Kala <ankikala@amazon.com>

---------

Signed-off-by: Ankit Kala <ankikala@amazon.com>
  • Loading branch information
ankitkala authored and vikasvb90 committed Jul 12, 2023
1 parent 235e483 commit 514c3de
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020))
- Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038))
- Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029)
- Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221)
- Add distributed tracing framework ([#7543](https://github.com/opensearch-project/OpenSearch/issues/7543))
- Enable Point based optimization for custom comparators ([#8168](https://github.com/opensearch-project/OpenSearch/pull/8168))
- [Extensions] Support extension additional settings with extension REST initialization ([#8414](https://github.com/opensearch-project/OpenSearch/pull/8414))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3661,6 +3661,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}

if (isRemoteStoreEnabled()) {
internalRefreshListener.add(
new RemoteStoreRefreshListener(
Expand All @@ -3672,9 +3676,6 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
);
}

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
* change to a writeable engine during relocation handoff after a round of segment replication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ private synchronized void syncSegments(boolean isRetry) {
if (indexShard.getReplicationTracker().isPrimaryMode() == false) {
return;
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
indexShard.onCheckpointPublished(checkpoint);
beforeSegmentsSync(isRetry);
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
Expand All @@ -214,7 +216,6 @@ private synchronized void syncSegments(boolean isRetry) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2692,6 +2692,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc
public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throws IOException {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
Expand Down Expand Up @@ -4815,6 +4816,7 @@ public void testTranslogFactoryForReplicaShardWithoutRemoteStore() throws IOExce
public void testTranslogFactoryForRemoteTranslogBackedPrimaryShard() throws IOException {
Settings primarySettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,17 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
public void setup(boolean primary, int numberOfDocs) throws IOException {
indexShard = newStartedShard(
primary,
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(),
Settings.builder()
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build(),
new InternalEngineFactory()
);

indexDocs(1, numberOfDocs);
indexShard.refresh("test");
if (primary) {
indexDocs(1, numberOfDocs);
indexShard.refresh("test");
}

clusterService = new ClusterService(
Settings.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -44,9 +45,13 @@ public class RemoteStoreReplicationSourceTests extends OpenSearchIndexLevelRepli
@Override
public void setUp() throws Exception {
super.setUp();

indexShard = newStartedShard(
true,
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(),
Settings.builder()
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build(),
new InternalEngineFactory()
);

Expand Down

0 comments on commit 514c3de

Please sign in to comment.