diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 1f040b5ff9be6..0df1fe83a06e1 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3671,6 +3671,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro final List internalRefreshListener = new ArrayList<>(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); + if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { + internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); + } + if (isRemoteStoreEnabled()) { internalRefreshListener.add( new RemoteStoreRefreshListener( @@ -3682,9 +3686,6 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro ); } - if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { - internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); - } /** * With segment replication enabled for primary relocation, recover replica shard initially as read only and * change to a writeable engine during relocation handoff after a round of segment replication. diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index e37303ad3e323..77c9d431bfafc 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -197,6 +197,8 @@ private synchronized void syncSegments(boolean isRetry) { logger.info("syncSegments is only supported for InternalEngine, called with {}. Skipping", indexShard.getEngine()); return; } + ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); + indexShard.onCheckpointPublished(checkpoint); beforeSegmentsSync(isRetry); long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs(); long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); @@ -220,7 +222,6 @@ private synchronized void syncSegments(boolean isRetry) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can // move. - ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); Collection localSegmentsPostRefresh = segmentInfos.files(true); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index ac36b11751351..fffb8958b4b34 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2631,6 +2631,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throws IOException { Settings settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) @@ -4759,6 +4760,7 @@ public void testTranslogFactoryForReplicaShardWithoutRemoteStore() throws IOExce public void testTranslogFactoryForRemoteTranslogBackedPrimaryShard() throws IOException { Settings primarySettings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 21a9393408529..6f38f080e5035 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -56,12 +56,17 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { public void setup(boolean primary, int numberOfDocs) throws IOException { indexShard = newStartedShard( primary, - Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(), + Settings.builder() + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(), new InternalEngineFactory() ); - indexDocs(1, numberOfDocs); - indexShard.refresh("test"); + if (primary) { + indexDocs(1, numberOfDocs); + indexShard.refresh("test"); + } clusterService = new ClusterService( Settings.EMPTY,