Skip to content

Commit

Permalink
Start replication timer before segments upload.
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <ankikala@amazon.com>
  • Loading branch information
ankitkala committed Jun 22, 2023
1 parent 63dc6aa commit 6fdf3cd
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3603,6 +3603,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}

if (isRemoteStoreEnabled()) {
internalRefreshListener.add(
new RemoteStoreRefreshListener(
Expand All @@ -3614,9 +3618,6 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
);
}

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
}
/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
* change to a writeable engine during relocation handoff after a round of segment replication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ private synchronized void syncSegments(boolean isRetry) {
if (indexShard.getReplicationTracker().isPrimaryMode() == false) {
return;
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
if (indexShard.indexSettings().isSegRepWithRemoteEnabled()) {
indexShard.onCheckpointPublished(checkpoint);
}
beforeSegmentsSync(isRetry);
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
Expand All @@ -202,7 +206,6 @@ private synchronized void syncSegments(boolean isRetry) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public SegmentReplicationCheckpointPublisher(PublishAction publishAction) {

public void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) {
publishAction.publish(indexShard, checkpoint);
indexShard.onCheckpointPublished(checkpoint);
if (indexShard.indexSettings().isSegRepWithRemoteEnabled() == false) {
indexShard.onCheckpointPublished(checkpoint);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2689,6 +2689,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc
public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throws IOException {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
Expand Down Expand Up @@ -4740,6 +4741,7 @@ public void testTranslogFactoryForReplicaShardWithoutRemoteStore() throws IOExce
public void testTranslogFactoryForRemoteTranslogBackedPrimaryShard() throws IOException {
Settings primarySettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -44,9 +45,11 @@ public class RemoteStoreReplicationSourceTests extends OpenSearchIndexLevelRepli
@Override
public void setUp() throws Exception {
super.setUp();

indexShard = newStartedShard(
true,
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(),
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(),
new InternalEngineFactory()
);

Expand Down

0 comments on commit 6fdf3cd

Please sign in to comment.