Skip to content

Commit

Permalink
Start replication checkpointTimers on primary before segments upload …
Browse files Browse the repository at this point in the history
…to remote store. (opensearch-project#8221)

* Start replication timer before segments upload.

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Addressed comments

Signed-off-by: Ankit Kala <ankikala@amazon.com>

---------

Signed-off-by: Ankit Kala <ankikala@amazon.com>
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ankitkala authored and ashking94 committed Jul 18, 2023
1 parent 9762871 commit 280ef1e
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3671,6 +3671,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}

if (isRemoteStoreEnabled()) {
internalRefreshListener.add(
new RemoteStoreRefreshListener(
Expand All @@ -3682,9 +3686,6 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
);
}

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
* change to a writeable engine during relocation handoff after a round of segment replication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ private synchronized void syncSegments(boolean isRetry) {
logger.info("syncSegments is only supported for InternalEngine, called with {}. Skipping", indexShard.getEngine());
return;
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
indexShard.onCheckpointPublished(checkpoint);
beforeSegmentsSync(isRetry);
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
Expand All @@ -220,7 +222,6 @@ private synchronized void syncSegments(boolean isRetry) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2631,6 +2631,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc
public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throws IOException {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
Expand Down Expand Up @@ -4759,6 +4760,7 @@ public void testTranslogFactoryForReplicaShardWithoutRemoteStore() throws IOExce
public void testTranslogFactoryForRemoteTranslogBackedPrimaryShard() throws IOException {
Settings primarySettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,17 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
public void setup(boolean primary, int numberOfDocs) throws IOException {
indexShard = newStartedShard(
primary,
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(),
Settings.builder()
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build(),
new InternalEngineFactory()
);

indexDocs(1, numberOfDocs);
indexShard.refresh("test");
if (primary) {
indexDocs(1, numberOfDocs);
indexShard.refresh("test");
}

clusterService = new ClusterService(
Settings.EMPTY,
Expand Down

0 comments on commit 280ef1e

Please sign in to comment.