diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2c1ffd0dccc62..24d895a5da504 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -134,6 +134,7 @@ import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestStatus; @@ -3034,7 +3035,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { * which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary. * * @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object) - * @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long, ActionListener) + * @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, ActionListener) */ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { assert seqNo != UNASSIGNED_SEQ_NO diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 0b7238bc2e25d..595a5935994b7 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -515,17 +515,21 @@ public void onTimeout(TimeValue timeout) { } }); }; - recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(), - request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(), - ActionListener.wrap( - checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)), - e -> { - if (e instanceof MapperException) { - retryOnMappingException.accept(e); - } else { - listener.onFailure(e); - } - }) + recoveryTarget.indexTranslogOperations( + request.operations(), + request.totalTranslogOps(), + request.maxSeenAutoIdTimestampOnPrimary(), + request.maxSeqNoOfUpdatesOrDeletesOnPrimary(), + request.retentionLeases(), + ActionListener.wrap( + checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)), + e -> { + if (e instanceof MapperException) { + retryOnMappingException.accept(e); + } else { + listener.onFailure(e); + } + }) ); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index f360a68b7a83c..a6973a46926a7 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -231,8 +232,16 @@ public void recoverToTarget(ActionListener listener) { // are at least as high as the corresponding values on the primary when any of these operations were executed on it. final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes(); - phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, - maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep); + final RetentionLeases retentionLeases = shard.getRetentionLeases(); + phase2( + startingSeqNo, + requiredSeqNoRangeStart, + endingSeqNo, + phase2Snapshot, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + sendSnapshotStep); sendSnapshotStep.whenComplete( r -> IOUtils.close(phase2Snapshot), e -> { @@ -517,8 +526,15 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it. * @param listener a listener which will be notified with the local checkpoint on the target. */ - void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) throws IOException { + void phase2( + final long startingSeqNo, + final long requiredSeqNoRangeStart, + final long endingSeqNo, + final Translog.Snapshot snapshot, + final long maxSeenAutoIdTimestamp, + final long maxSeqNoOfUpdatesOrDeletes, + final RetentionLeases retentionLeases, + final ActionListener listener) throws IOException { assert requiredSeqNoRangeStart <= endingSeqNo + 1: "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; assert startingSeqNo <= requiredSeqNoRangeStart : @@ -584,25 +600,50 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, listener::onFailure ); - sendBatch(readNextBatch, true, SequenceNumbers.UNASSIGNED_SEQ_NO, snapshot.totalOperations(), - maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, batchedListener); + sendBatch( + readNextBatch, + true, + SequenceNumbers.UNASSIGNED_SEQ_NO, + snapshot.totalOperations(), + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + batchedListener); } - private void sendBatch(CheckedSupplier, IOException> nextBatch, boolean firstBatch, - long targetLocalCheckpoint, int totalTranslogOps, long maxSeenAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) throws IOException { + private void sendBatch( + final CheckedSupplier, IOException> nextBatch, + final boolean firstBatch, + final long targetLocalCheckpoint, + final int totalTranslogOps, + final long maxSeenAutoIdTimestamp, + final long maxSeqNoOfUpdatesOrDeletes, + final RetentionLeases retentionLeases, + final ActionListener listener) throws IOException { final List operations = nextBatch.get(); // send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint if (operations.isEmpty() == false || firstBatch) { cancellableThreads.execute(() -> { - recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, - ActionListener.wrap( - newCheckpoint -> { - sendBatch(nextBatch, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), - totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener); - }, - listener::onFailure - )); + recoveryTarget.indexTranslogOperations( + operations, + totalTranslogOps, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + ActionListener.wrap( + newCheckpoint -> { + sendBatch( + nextBatch, + false, + SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint), + totalTranslogOps, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + listener); + }, + listener::onFailure + )); }); } else { listener.onResponse(targetLocalCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index d24c3773d264c..e8c645623893a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotRecoveringException; @@ -400,8 +401,13 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary, - long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxSeenAutoIdTimestampOnPrimary, + final long maxSeqNoOfDeletesOrUpdatesOnPrimary, + final RetentionLeases retentionLeases, + final ActionListener listener) { ActionListener.completeWith(listener, () -> { final RecoveryState.Translog translog = state().getTranslog(); translog.totalOperations(totalTranslogOps); @@ -421,6 +427,11 @@ public void indexTranslogOperations(List operations, int tot * replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on. */ indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary); + /* + * We have to update the retention leases before we start applying translog operations to ensure we are retaining according to + * the policy. + */ + indexShard().updateRetentionLeasesOnReplica(retentionLeases); for (Translog.Operation operation : operations) { Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index b1ddc80e77eb3..be7c00d52c94d 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; @@ -39,8 +40,8 @@ public interface RecoveryTargetHandler { void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener listener); /** - * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and - * updates the global checkpoint. + * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates + * the global checkpoint. * * @param globalCheckpoint the global checkpoint on the recovery source * @param listener the listener which will be notified when this method is completed @@ -68,11 +69,17 @@ public interface RecoveryTargetHandler { * @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on * the primary shard when capturing these operations. This value is at least as high as the * max_seq_no_of_updates on the primary was when any of these ops were processed on it. + * @param retentionLeases the retention leases on the primary * @param listener a listener which will be notified with the local checkpoint on the target * after these operations are successfully indexed on the target. */ - void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary, - long maxSeqNoOfUpdatesOrDeletesOnPrimary, ActionListener listener); + void indexTranslogOperations( + List operations, + int totalTranslogOps, + long maxSeenAutoIdTimestampOnPrimary, + long maxSeqNoOfUpdatesOrDeletesOnPrimary, + RetentionLeases retentionLeases, + ActionListener listener); /** * Notifies the target of the files it is going to receive diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index 0ae5d507eb357..495d4cabd8bb7 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; @@ -39,18 +40,26 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest { private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; private long maxSeenAutoIdTimestampOnPrimary; private long maxSeqNoOfUpdatesOrDeletesOnPrimary; + private RetentionLeases retentionLeases; public RecoveryTranslogOperationsRequest() { } - RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List operations, int totalTranslogOps, - long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) { + RecoveryTranslogOperationsRequest( + final long recoveryId, + final ShardId shardId, + final List operations, + final int totalTranslogOps, + final long maxSeenAutoIdTimestampOnPrimary, + final long maxSeqNoOfUpdatesOrDeletesOnPrimary, + final RetentionLeases retentionLeases) { this.recoveryId = recoveryId; this.shardId = shardId; this.operations = operations; this.totalTranslogOps = totalTranslogOps; this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary; this.maxSeqNoOfUpdatesOrDeletesOnPrimary = maxSeqNoOfUpdatesOrDeletesOnPrimary; + this.retentionLeases = retentionLeases; } public long recoveryId() { @@ -77,6 +86,10 @@ public long maxSeqNoOfUpdatesOrDeletesOnPrimary() { return maxSeqNoOfUpdatesOrDeletesOnPrimary; } + public RetentionLeases retentionLeases() { + return retentionLeases; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -95,6 +108,11 @@ public void readFrom(StreamInput in) throws IOException { // UNASSIGNED_SEQ_NO means uninitialized and replica won't enable optimization using seq_no maxSeqNoOfUpdatesOrDeletesOnPrimary = SequenceNumbers.UNASSIGNED_SEQ_NO; } + if (in.getVersion().onOrAfter(Version.V_6_7_0)) { + retentionLeases = new RetentionLeases(in); + } else { + retentionLeases = RetentionLeases.EMPTY; + } } @Override @@ -110,5 +128,8 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_6_5_0)) { out.writeZLong(maxSeqNoOfUpdatesOrDeletesOnPrimary); } + if (out.getVersion().onOrAfter(Version.V_6_7_0)) { + retentionLeases.writeTo(out); + } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 0799cc6595189..436d59dba85ad 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -113,10 +114,21 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary, - long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxSeenAutoIdTimestampOnPrimary, + final long maxSeqNoOfDeletesOrUpdatesOnPrimary, + final RetentionLeases retentionLeases, + final ActionListener listener) { final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest( - recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary); + recoveryId, + shardId, + operations, + totalTranslogOps, + maxSeenAutoIdTimestampOnPrimary, + maxSeqNoOfDeletesOrUpdatesOnPrimary, + retentionLeases); transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions, new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(r.localCheckpoint), listener::onFailure), RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC)); diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index d2184c13af310..87068836d0945 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -503,13 +504,17 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { AtomicBoolean opsSent = new AtomicBoolean(false); final Future recoveryFuture = shards.asyncRecoverReplica(newReplica, (indexShard, node) -> { recoveryStart.countDown(); - return new RecoveryTarget(indexShard, node, recoveryListener, l -> { - }) { + return new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, - long maxSeenAutoIdTimestamp, long msu, ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxSeenAutoIdTimestamp, + final long msu, + final RetentionLeases retentionLeases, + final ActionListener listener) { opsSent.set(true); - super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, msu, listener); + super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, msu, retentionLeases, listener); } }; }); @@ -576,9 +581,13 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) { replica, (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { @Override - public void indexTranslogOperations(final List operations, final int totalTranslogOps, - final long maxAutoIdTimestamp, long maxSeqNoOfUpdates, - ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxAutoIdTimestamp, + final long maxSeqNoOfUpdates, + final RetentionLeases retentionLeases, + final ActionListener listener) { // index a doc which is not part of the snapshot, but also does not complete on replica replicaEngineFactory.latchIndexers(1); threadPool.generic().submit(() -> { @@ -605,7 +614,13 @@ public void indexTranslogOperations(final List operations, f } catch (InterruptedException e) { throw new AssertionError(e); } - super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, listener); + super.indexTranslogOperations( + operations, + totalTranslogOps, + maxAutoIdTimestamp, + maxSeqNoOfUpdates, + retentionLeases, + listener); } }); pendingDocActiveWithExtraDocIndexed.await(); @@ -847,12 +862,17 @@ private void blockIfNeeded(RecoveryState.Stage currentStage) { } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, - long maxAutoIdTimestamp, long maxSeqNoOfUpdates, ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxAutoIdTimestamp, + final long maxSeqNoOfUpdates, + final RetentionLeases retentionLeases, + final ActionListener listener) { if (hasBlocked() == false) { blockIfNeeded(RecoveryState.Stage.TRANSLOG); } - super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, listener); + super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, retentionLeases, listener); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index fffd29531c960..b189cf8b20615 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -20,10 +20,12 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; @@ -71,7 +73,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final String source = randomAlphaOfLength(8); final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); - // simulate a peer-recovery which locks the soft-deletes policy on the primary. + // simulate a peer recovery which locks the soft deletes policy on the primary final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); @@ -171,4 +173,68 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { } } + public void testRetentionLeasesSyncOnRecovery() throws Exception { + final int numberOfReplicas = 1; + /* + * We effectively disable the background sync to ensure that the retention leases are not synced in the background so that the only + * source of retention leases on the replicas would be from the commit point and recovery. + */ + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueHours(24)) + .build(); + createIndex("index", settings); + ensureYellow("index"); + // exclude the replicas from being allocated + allowNodes("index", 1); + final AcknowledgedResponse response = client().admin() + .indices() + .prepareUpdateSettings("index").setSettings(Settings.builder().put("index.number_of_replicas", numberOfReplicas).build()) + .get(); + assertTrue(response.isAcknowledged()); + final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); + final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = internalCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + final int length = randomIntBetween(1, 8); + final Map currentRetentionLeases = new HashMap<>(); + for (int i = 0; i < length; i++) { + final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8)); + final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); + latch.await(); + /* + * Now renew the leases; since we do not flush immediately on renewal, this means that the latest retention leases will not be + * in the latest commit point and therefore not transferred during the file-copy phase of recovery. + */ + currentRetentionLeases.put(id, primary.renewRetentionLease(id, retainingSequenceNumber, source)); + } + + // now allow the replicas to be allocated and wait for recovery to finalize + allowNodes("index", 1 + numberOfReplicas); + ensureGreen("index"); + + // check current retention leases have been synced to all replicas + for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { + final String replicaShardNodeId = replicaShard.currentNodeId(); + final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName(); + final IndexShard replica = internalCluster() + .getInstance(IndicesService.class, replicaShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); + assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); + + // check retention leases have been committed on the replica + final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases( + replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES)); + assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases))); + } + } + } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 7a5e8e35f3628..05866ebaa0709 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -102,6 +102,7 @@ import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.VersionFieldMapper; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; @@ -2428,10 +2429,20 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes, ActionListener listener){ - super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, - ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded()))); + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxSeenAutoIdTimestamp, + final long maxSeqNoOfUpdatesOrDeletes, + final RetentionLeases retentionLeases, + final ActionListener listener){ + super.indexTranslogOperations( + operations, + totalTranslogOps, + maxSeenAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded()))); } }, true, true); @@ -2535,14 +2546,26 @@ public void testShardActiveDuringPeerRecovery() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long maxAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes, ActionListener listener){ - super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, - ActionListener.wrap(checkpoint -> { - listener.onResponse(checkpoint); - // Shard should now be active since we did recover: - assertTrue(replica.isActive()); - }, listener::onFailure)); + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxAutoIdTimestamp, + final long maxSeqNoOfUpdatesOrDeletes, + final RetentionLeases retentionLeases, + final ActionListener listener){ + super.indexTranslogOperations( + operations, + totalTranslogOps, + maxAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + ActionListener.wrap( + checkpoint -> { + listener.onResponse(checkpoint); + // Shard should now be active since we did recover: + assertTrue(replica.isActive()); + }, + listener::onFailure)); } }, false, true); @@ -2585,13 +2608,25 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long maxAutoIdTimestamp, - long maxSeqNoOfUpdatesOrDeletes, ActionListener listener) { - super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, - ActionListener.wrap(checkpoint -> { - assertListenerCalled.accept(replica); - listener.onResponse(checkpoint); - }, listener::onFailure)); + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long maxAutoIdTimestamp, + final long maxSeqNoOfUpdatesOrDeletes, + final RetentionLeases retentionLeases, + final ActionListener listener) { + super.indexTranslogOperations( + operations, + totalTranslogOps, + maxAutoIdTimestamp, + maxSeqNoOfUpdatesOrDeletes, + retentionLeases, + ActionListener.wrap( + checkpoint -> { + assertListenerCalled.accept(replica); + listener.onResponse(checkpoint); + }, + listener::onFailure)); } @Override diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 971504ff46377..be2c4c036baee 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -63,6 +63,7 @@ import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -238,7 +239,7 @@ public void testSendSnapshotSendsOps() throws IOException { RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { @Override public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, - ActionListener listener) { + RetentionLeases retentionLeases, ActionListener listener) { shippedOps.addAll(operations); checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE)); maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get())); @@ -247,7 +248,7 @@ public void indexTranslogOperations(List operations, int tot RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10)); PlainActionFuture future = new PlainActionFuture<>(); handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()), - randomNonNegativeLong(), randomNonNegativeLong(), future); + randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future); final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); RecoverySourceHandler.SendSnapshotResult result = future.actionGet(); assertThat(result.totalOperations, equalTo(expectedOps)); @@ -265,7 +266,7 @@ public void indexTranslogOperations(List operations, int tot PlainActionFuture failedFuture = new PlainActionFuture<>(); expectThrows(IllegalStateException.class, () -> { handler.phase2(startingSeqNo, requiredStartingSeqNo, endingSeqNo, newTranslogSnapshot(operations, opsToSkip), - randomNonNegativeLong(), randomNonNegativeLong(), failedFuture); + randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, failedFuture); failedFuture.actionGet(); }); } @@ -285,7 +286,7 @@ public void testSendSnapshotStopOnError() throws Exception { RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() { @Override public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, - long msu, ActionListener listener) { + long msu, RetentionLeases retentionLeases, ActionListener listener) { if (randomBoolean()) { maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED)); } else { @@ -299,7 +300,7 @@ public void indexTranslogOperations(List operations, int tot final long startingSeqNo = randomLongBetween(0, ops.size() - 1L); final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L); handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()), - randomNonNegativeLong(), randomNonNegativeLong(), future); + randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future); if (wasFailed.get()) { assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index")); } @@ -498,11 +499,11 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A @Override void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, - long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, + long maxSeenAutoIdTimestamp, long maxSeqNoOfUpdatesOrDeletes, RetentionLeases retentionLeases, ActionListener listener) throws IOException { phase2Called.set(true); super.phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, - maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener); + maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, listener); } }; @@ -716,8 +717,13 @@ public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryConte } @Override - public void indexTranslogOperations(List operations, int totalTranslogOps, long timestamp, long msu, - ActionListener listener) { + public void indexTranslogOperations( + final List operations, + final int totalTranslogOps, + final long timestamp, + final long msu, + final RetentionLeases retentionLeases, + final ActionListener listener) { } @Override