From 760dfe395ae24853d6a615931d89637c5c5f8ea4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 4 Feb 2019 18:13:31 -0500 Subject: [PATCH 1/2] Lift retention lease expiration to index shard This commit lifts the control of when retention leases are expired to index shard. In this case, we move expiration to an explicit action rather than a side-effect of calling ReplicationTracker#getRetentionLeases. This explicit action is invoked on a timer. If any retention leases expire, then we hard sync the retention leases to the replicas. Otherwise, we proceed with a background sync. --- .../org/elasticsearch/index/IndexService.java | 22 +-- .../elasticsearch/index/IndexSettings.java | 5 + .../index/seqno/ReplicationTracker.java | 74 +++++----- .../elasticsearch/index/shard/IndexShard.java | 26 +++- ...ReplicationTrackerRetentionLeaseTests.java | 134 ++++-------------- .../index/seqno/RetentionLeaseSyncIT.java | 72 +++++++--- .../shard/IndexShardRetentionLeaseTests.java | 69 +++++---- 7 files changed, 193 insertions(+), 209 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 57d8cc0b32641..1b1784495e685 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -121,7 +121,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private volatile AsyncRefreshTask refreshTask; private volatile AsyncTranslogFSync fsyncTask; private volatile AsyncGlobalCheckpointTask globalCheckpointTask; - private volatile AsyncRetentionLeaseBackgroundSyncTask retentionLeaseBackgroundSyncTask; + private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask; // don't convert to Setting<> and register... we only set this in tests and register via a plugin private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval"; @@ -198,7 +198,7 @@ public IndexService( this.refreshTask = new AsyncRefreshTask(this); this.trimTranslogTask = new AsyncTrimTranslogTask(this); this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); - this.retentionLeaseBackgroundSyncTask = new AsyncRetentionLeaseBackgroundSyncTask(this); + this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); rescheduleFsyncTask(indexSettings.getTranslogDurability()); } @@ -289,7 +289,7 @@ public synchronized void close(final String reason, boolean delete) throws IOExc fsyncTask, trimTranslogTask, globalCheckpointTask, - retentionLeaseBackgroundSyncTask); + retentionLeaseSyncTask); } } } @@ -788,8 +788,8 @@ private void maybeSyncGlobalCheckpoints() { sync(is -> is.maybeSyncGlobalCheckpoint("background"), "global checkpoint"); } - private void backgroundSyncRetentionLeases() { - sync(IndexShard::backgroundSyncRetentionLeases, "retention lease"); + private void syncRetentionLeases() { + sync(IndexShard::syncRetentionLeases, "retention lease"); } private void sync(final Consumer sync, final String source) { @@ -812,11 +812,11 @@ private void sync(final Consumer sync, final String source) { && e instanceof IndexShardClosedException == false) { logger.warn( new ParameterizedMessage( - "{} failed to execute background {} sync", shard.shardId(), source), e); + "{} failed to execute {} sync", shard.shardId(), source), e); } }, ThreadPool.Names.SAME, - "background " + source + " sync"); + source + " sync"); } catch (final AlreadyClosedException | IndexShardClosedException e) { // the shard was closed concurrently, continue } @@ -957,15 +957,15 @@ public String toString() { } } - final class AsyncRetentionLeaseBackgroundSyncTask extends BaseAsyncTask { + final class AsyncRetentionLeaseSyncTask extends BaseAsyncTask { - AsyncRetentionLeaseBackgroundSyncTask(final IndexService indexService) { + AsyncRetentionLeaseSyncTask(final IndexService indexService) { super(indexService, RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings())); } @Override protected void runInternal() { - indexService.backgroundSyncRetentionLeases(); + indexService.syncRetentionLeases(); } @Override @@ -975,7 +975,7 @@ protected String getThreadPool() { @Override public String toString() { - return "retention_lease_background_sync"; + return "retention_lease_sync"; } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 4d9a8f7d37b70..97b499f9bd309 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -339,6 +339,10 @@ public long getRetentionLeaseMillis() { return retentionLeaseMillis; } + private void setRetentionLeaseMillis(final TimeValue retentionLease) { + this.retentionLeaseMillis = retentionLease.millis(); + } + private volatile boolean warmerEnabled; private volatile int maxResultWindow; private volatile int maxInnerResultWindow; @@ -523,6 +527,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled); + scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING, this::setRetentionLeaseMillis); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 3b68dfa6addae..22385b7bd2f0c 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -155,10 +156,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private final LongSupplier currentTimeMillisSupplier; /** - * A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the - * retention lease sync action, to sync retention leases to replicas. + * A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync + * retention leases to replicas. */ - private final BiConsumer> onSyncRetentionLeases; + private final BiConsumer> onAddRetentionLease; /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the @@ -177,43 +178,46 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private RetentionLeases retentionLeases = RetentionLeases.EMPTY; /** - * Get all non-expired retention leases tracked on this shard. Note that only the primary shard calculates which leases are expired, - * and if any have expired, syncs the retention leases to any replicas. + * Get all retention leases tracked on this shard. * * @return the retention leases */ public RetentionLeases getRetentionLeases() { - final boolean wasPrimaryMode; + return getRetentionLeases(false).v2(); + } + + /** + * If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates + * expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the + * primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the + * expire leases parameter is true, this replication tracker must be in primary mode. + * + * @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases + */ + public Tuple getRetentionLeases(final boolean expireLeases) { final RetentionLeases nonExpiredRetentionLeases; synchronized (this) { - if (primaryMode) { - // the primary calculates the non-expired retention leases and syncs them to replicas - final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); - final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); - final Map> partitionByExpiration = retentionLeases - .leases() - .stream() - .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis)); - if (partitionByExpiration.get(true) == null) { - // early out as no retention leases have expired - return retentionLeases; - } - final Collection nonExpiredLeases = - partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList(); - retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases); + if (expireLeases == false) { + return Tuple.tuple(false, retentionLeases); } - /* - * At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or - * we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the - * non-expired retention leases, instead receiving them on syncs from the primary. - */ - wasPrimaryMode = primaryMode; + assert primaryMode; + // the primary calculates the non-expired retention leases and syncs them to replicas + final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); + final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); + final Map> partitionByExpiration = retentionLeases + .leases() + .stream() + .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis)); + if (partitionByExpiration.get(true) == null) { + // early out as no retention leases have expired + return Tuple.tuple(false, retentionLeases); + } + final Collection nonExpiredLeases = + partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList(); + retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases); nonExpiredRetentionLeases = retentionLeases; } - if (wasPrimaryMode) { - onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {})); - } - return nonExpiredRetentionLeases; + return Tuple.tuple(true, nonExpiredRetentionLeases); } /** @@ -246,7 +250,7 @@ public RetentionLease addRetentionLease( Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList())); currentRetentionLeases = retentionLeases; } - onSyncRetentionLeases.accept(currentRetentionLeases, listener); + onAddRetentionLease.accept(currentRetentionLeases, listener); return retentionLease; } @@ -563,7 +567,7 @@ private static long inSyncCheckpointStates( * @param indexSettings the index settings * @param operationPrimaryTerm the current primary term * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} - * @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires + * @param onAddRetentionLease a callback when a new retention lease is created or an existing retention lease expires */ public ReplicationTracker( final ShardId shardId, @@ -573,7 +577,7 @@ public ReplicationTracker( final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, - final BiConsumer> onSyncRetentionLeases) { + final BiConsumer> onAddRetentionLease) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -585,7 +589,7 @@ public ReplicationTracker( checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated); this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier); - this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases); + this.onAddRetentionLease = Objects.requireNonNull(onAddRetentionLease); this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c3d653e2fde06..dfecdc173951e 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1892,13 +1892,26 @@ public void addGlobalCheckpointListener( } /** - * Get all non-expired retention leases tracked on this shard. + * Get all retention leases tracked on this shard. * * @return the retention leases */ public RetentionLeases getRetentionLeases() { + return getRetentionLeases(false).v2(); + } + + /** + * If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates + * expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the + * primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the + * expire leases parameter is true, this replication tracker must be in primary mode. + * + * @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases + */ + public Tuple getRetentionLeases(final boolean expireLeases) { + assert expireLeases == false || assertPrimaryMode(); verifyNotClosed(); - return replicationTracker.getRetentionLeases(); + return replicationTracker.getRetentionLeases(expireLeases); } public RetentionLeaseStats getRetentionLeaseStats() { @@ -1956,10 +1969,15 @@ public void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases /** * Syncs the current retention leases to all replicas. */ - public void backgroundSyncRetentionLeases() { + public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); - retentionLeaseSyncer.backgroundSync(shardId, getRetentionLeases()); + final Tuple retentionLeases = getRetentionLeases(true); + if (retentionLeases.v1()) { + retentionLeaseSyncer.sync(shardId, retentionLeases.v2(), ActionListener.wrap(() -> {})); + } else { + retentionLeaseSyncer.backgroundSync(shardId, retentionLeases.v2()); + } } /** diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 520344489adf9..bb526a3470873 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.LongSupplier; import java.util.stream.Collectors; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -46,7 +45,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.lessThanOrEqualTo; public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTestCase { @@ -78,7 +76,7 @@ public void testAddOrRenewRetentionLease() { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); replicationTracker.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); - assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true); + assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false); } for (int i = 0; i < length; i++) { @@ -88,7 +86,7 @@ public void testAddOrRenewRetentionLease() { } minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true); + assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, primaryTerm, 1 + length + i, true, false); } } @@ -193,7 +191,7 @@ private void runExpirationTest(final boolean primaryMode) { assertThat(retentionLeases.leases(), hasSize(1)); final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primaryMode); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 1, primaryMode, false); } // renew the lease @@ -215,108 +213,20 @@ private void runExpirationTest(final boolean primaryMode) { assertThat(retentionLeases.leases(), hasSize(1)); final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primaryMode); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, primaryMode, false); } // now force the lease to expire currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); if (primaryMode) { - assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, true, false); + assertRetentionLeases(replicationTracker, 0, new long[0], primaryTerm, 3, true, true); } else { // leases do not expire on replicas until synced from the primary - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, false, false); } } - public void testRetentionLeaseExpirationCausesRetentionLeaseSync() { - final AllocationId allocationId = AllocationId.newInitializing(); - final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024)); - final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); - final Settings settings = Settings - .builder() - .put( - IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), - TimeValue.timeValueMillis(retentionLeaseMillis)) - .build(); - final Map> retentionLeases = new HashMap<>(); - final AtomicBoolean invoked = new AtomicBoolean(); - final AtomicReference reference = new AtomicReference<>(); - final ReplicationTracker replicationTracker = new ReplicationTracker( - new ShardId("test", "_na", 0), - allocationId.getId(), - IndexSettingsModule.newIndexSettings("test", settings), - randomNonNegativeLong(), - UNASSIGNED_SEQ_NO, - value -> {}, - currentTimeMillis::get, - (leases, listener) -> { - // we do not want to hold a lock on the replication tracker in the callback! - assertFalse(Thread.holdsLock(reference.get())); - invoked.set(true); - assertThat( - leases.leases() - .stream() - .collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), - equalTo(retentionLeases)); - }); - reference.set(replicationTracker); - replicationTracker.updateFromMaster( - randomNonNegativeLong(), - Collections.singleton(allocationId.getId()), - routingTable(Collections.emptySet(), allocationId), - Collections.emptySet()); - replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); - - final int length = randomIntBetween(0, 8); - long version = 0; - for (int i = 0; i < length; i++) { - final String id = randomAlphaOfLength(8); - final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get())); - replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {})); - version++; - assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version)); - // assert that the new retention lease callback was invoked - assertTrue(invoked.get()); - - // reset the invocation marker so that we can assert the callback was not invoked when renewing the lease - invoked.set(false); - currentTimeMillis.set(1 + currentTimeMillis.get()); - retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get())); - replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test"); - version++; - assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version)); - - // reset the invocation marker so that we can assert the callback was invoked if any leases are expired - assertFalse(invoked.get()); - // randomly expire some leases - final long currentTimeMillisIncrement = randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get()); - // calculate the expired leases and update our tracking map - final List expiredIds = retentionLeases.entrySet() - .stream() - .filter(r -> currentTimeMillis.get() + currentTimeMillisIncrement > r.getValue().v2() + retentionLeaseMillis) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); - expiredIds.forEach(retentionLeases::remove); - if (expiredIds.isEmpty() == false) { - version++; - } - currentTimeMillis.set(currentTimeMillis.get() + currentTimeMillisIncrement); - // getting the leases has the side effect of calculating which leases are expired and invoking the sync callback - final RetentionLeases current = replicationTracker.getRetentionLeases(); - assertThat(current.version(), equalTo(version)); - // the current leases should equal our tracking map - assertThat( - current.leases() - .stream() - .collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), - equalTo(retentionLeases)); - // the callback should only be invoked if there were expired leases - assertThat(invoked.get(), equalTo(expiredIds.isEmpty() == false)); - } - assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version)); - } - public void testReplicaIgnoresOlderRetentionLeasesVersion() { final AllocationId allocationId = AllocationId.newInitializing(); final ReplicationTracker replicationTracker = new ReplicationTracker( @@ -370,19 +280,29 @@ public void testReplicaIgnoresOlderRetentionLeasesVersion() { } } - private static Tuple toTuple(final RetentionLease retentionLease) { - return Tuple.tuple(retentionLease.retainingSequenceNumber(), retentionLease.timestamp()); - } - private void assertRetentionLeases( final ReplicationTracker replicationTracker, final int size, final long[] minimumRetainingSequenceNumbers, - final LongSupplier currentTimeMillisSupplier, final long primaryTerm, final long version, - final boolean primaryMode) { - final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); + final boolean primaryMode, + final boolean expireLeases) { + assertTrue(expireLeases == false || primaryMode); + final RetentionLeases retentionLeases; + if (expireLeases == false) { + if (randomBoolean()) { + retentionLeases = replicationTracker.getRetentionLeases(); + } else { + final Tuple tuple = replicationTracker.getRetentionLeases(false); + assertFalse(tuple.v1()); + retentionLeases = tuple.v2(); + } + } else { + final Tuple tuple = replicationTracker.getRetentionLeases(true); + assertTrue(tuple.v1()); + retentionLeases = tuple.v2(); + } assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); assertThat(retentionLeases.version(), equalTo(version)); final Map idToRetentionLease = new HashMap<>(); @@ -395,12 +315,6 @@ private void assertRetentionLeases( assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i))); final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); - if (primaryMode) { - // retention leases can be expired on replicas, so we can only assert on primaries here - assertThat( - currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), - lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis())); - } assertThat(retentionLease.source(), equalTo("test-" + i)); } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index 3e69c84e3cde3..2eb0b54f36127 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -20,33 +20,58 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) public class RetentionLeaseSyncIT extends ESIntegTestCase { + public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin { + + @Override + public List> getSettings() { + return Collections.singletonList(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING); + } + + } + + @Override + protected Collection> nodePlugins() { + return Stream.concat( + super.nodePlugins().stream(), + Stream.of(RetentionLeaseBackgroundSyncIT.RetentionLeaseSyncIntervalSettingPlugin.class)) + .collect(Collectors.toList()); + } + public void testRetentionLeasesSyncedOnAdd() throws Exception { final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); @@ -99,7 +124,6 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37963") public void testRetentionLeasesSyncOnExpiration() throws Exception { final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); @@ -109,7 +133,7 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", numberOfReplicas) - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), retentionLeaseTimeToLive) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)) .build(); createIndex("index", settings); ensureGreen("index"); @@ -121,6 +145,17 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { // we will add multiple retention leases, wait for some to expire, and assert a consistent view between the primary and the replicas final int length = randomIntBetween(1, 8); for (int i = 0; i < length; i++) { + // update the index for retention leases to live a long time + final AcknowledgedResponse longTtlResponse = client().admin() + .indices() + .prepareUpdateSettings("index") + .setSettings( + Settings.builder() + .putNull(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey()) + .build()) + .get(); + assertTrue(longTtlResponse.isAcknowledged()); + final String id = randomAlphaOfLength(8); final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); final String source = randomAlphaOfLength(8); @@ -137,19 +172,26 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - assertThat(replica.getRetentionLeases().leases(), hasItem(currentRetentionLease)); + assertThat(replica.getRetentionLeases().leases(), anyOf(empty(), contains(currentRetentionLease))); } - // sleep long enough that *possibly* the current retention lease has expired, and certainly that any previous have + // update the index for retention leases to short a long time, to force expiration + final AcknowledgedResponse shortTtlResponse = client().admin() + .indices() + .prepareUpdateSettings("index") + .setSettings( + Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), retentionLeaseTimeToLive) + .build()) + .get(); + assertTrue(shortTtlResponse.isAcknowledged()); + + // sleep long enough that the current retention lease has expired final long later = System.nanoTime(); Thread.sleep(Math.max(0, retentionLeaseTimeToLive.millis() - TimeUnit.NANOSECONDS.toMillis(later - now))); - final RetentionLeases currentRetentionLeases = primary.getRetentionLeases(); - assertThat(currentRetentionLeases.leases(), anyOf(empty(), contains(currentRetentionLease))); + assertBusy(() -> assertThat(primary.getRetentionLeases().leases(), empty())); - /* - * Check that expiration of retention leases has been synced to all replicas. We have to assert busy since syncing happens in - * the background. - */ + // now that all retention leases are expired should have been synced to all replicas assertBusy(() -> { for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { final String replicaShardNodeId = replicaShard.currentNodeId(); @@ -157,13 +199,7 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - if (currentRetentionLeases.leases().isEmpty()) { - assertThat(replica.getRetentionLeases().leases(), empty()); - } else { - assertThat( - replica.getRetentionLeases().leases(), - contains(currentRetentionLeases.leases().toArray(new RetentionLease[0]))); - } + assertThat(replica.getRetentionLeases().leases(), empty()); } }); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 75d8d7e8e2679..cc64fc6f8b2de 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; @@ -43,14 +44,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.LongSupplier; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -85,13 +84,20 @@ public void testAddOrRenewRetentionLease() throws IOException { indexShard.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); assertRetentionLeases( - indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true); + indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false); } for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); indexShard.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true); + assertRetentionLeases( + indexShard, + length, + minimumRetainingSequenceNumbers, + primaryTerm, + 1 + length + i, + true, + false); } } finally { closeShards(indexShard); @@ -121,8 +127,7 @@ private void runExpirationTest(final boolean primary) throws IOException { final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(0, Long.MAX_VALUE); if (primary) { - indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> { - })); + indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); } else { final RetentionLeases retentionLeases = new RetentionLeases( primaryTerm, @@ -137,7 +142,7 @@ private void runExpirationTest(final boolean primary) throws IOException { assertThat(retentionLeases.leases(), hasSize(1)); final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primary); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 1, primary, false); } // renew the lease @@ -159,16 +164,17 @@ private void runExpirationTest(final boolean primary) throws IOException { assertThat(retentionLeases.leases(), hasSize(1)); final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primary); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, primary, false); } // now force the lease to expire currentTimeMillis.set( currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); if (primary) { - assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, true, false); + assertRetentionLeases(indexShard, 0, new long[0], primaryTerm, 3, true, true); } else { - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, false, false); } } finally { closeShards(indexShard); @@ -191,8 +197,7 @@ public void testCommit() throws IOException { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(randomNonNegativeLong())); indexShard.addRetentionLease( - Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> { - })); + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); } currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE)); @@ -250,13 +255,10 @@ public void testRetentionLeaseStats() throws IOException { final RetentionLeaseStats stats = indexShard.getRetentionLeaseStats(); assertRetentionLeases( stats.retentionLeases(), - indexShard.indexSettings().getRetentionLeaseMillis(), length, minimumRetainingSequenceNumbers, - () -> 0L, length == 0 ? RetentionLeases.EMPTY.primaryTerm() : indexShard.getOperationPrimaryTerm(), - length, - true); + length); } finally { closeShards(indexShard); } @@ -266,30 +268,39 @@ private void assertRetentionLeases( final IndexShard indexShard, final int size, final long[] minimumRetainingSequenceNumbers, - final LongSupplier currentTimeMillisSupplier, final long primaryTerm, final long version, - final boolean primary) { + final boolean primary, + final boolean expireLeases) { + assertTrue(expireLeases == false || primary); + final RetentionLeases retentionLeases; + if (expireLeases == false) { + if (randomBoolean()) { + retentionLeases = indexShard.getRetentionLeases(); + } else { + final Tuple tuple = indexShard.getRetentionLeases(false); + assertFalse(tuple.v1()); + retentionLeases = tuple.v2(); + } + } else { + final Tuple tuple = indexShard.getRetentionLeases(true); + assertTrue(tuple.v1()); + retentionLeases = tuple.v2(); + } assertRetentionLeases( - indexShard.getEngine().config().retentionLeasesSupplier().get(), - indexShard.indexSettings().getRetentionLeaseMillis(), + retentionLeases, size, minimumRetainingSequenceNumbers, - currentTimeMillisSupplier, primaryTerm, - version, - primary); + version); } private void assertRetentionLeases( final RetentionLeases retentionLeases, - final long retentionLeaseMillis, final int size, final long[] minimumRetainingSequenceNumbers, - final LongSupplier currentTimeMillisSupplier, final long primaryTerm, - final long version, - final boolean primary) { + final long version) { assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); assertThat(retentionLeases.version(), equalTo(version)); final Map idToRetentionLease = new HashMap<>(); @@ -302,10 +313,6 @@ private void assertRetentionLeases( assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i))); final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); - if (primary) { - // retention leases can be expired on replicas, so we can only assert on primaries here - assertThat(currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), lessThanOrEqualTo(retentionLeaseMillis)); - } assertThat(retentionLease.source(), equalTo("test-" + i)); } } From fb0103794e47640b3a1093f65fefb316cc80e557 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 4 Feb 2019 22:25:18 -0500 Subject: [PATCH 2/2] Lift synchronized --- .../index/seqno/ReplicationTracker.java | 42 +++++++++---------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 22385b7bd2f0c..31f491d24cf9d 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -194,30 +194,26 @@ public RetentionLeases getRetentionLeases() { * * @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases */ - public Tuple getRetentionLeases(final boolean expireLeases) { - final RetentionLeases nonExpiredRetentionLeases; - synchronized (this) { - if (expireLeases == false) { - return Tuple.tuple(false, retentionLeases); - } - assert primaryMode; - // the primary calculates the non-expired retention leases and syncs them to replicas - final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); - final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); - final Map> partitionByExpiration = retentionLeases - .leases() - .stream() - .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis)); - if (partitionByExpiration.get(true) == null) { - // early out as no retention leases have expired - return Tuple.tuple(false, retentionLeases); - } - final Collection nonExpiredLeases = - partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList(); - retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases); - nonExpiredRetentionLeases = retentionLeases; + public synchronized Tuple getRetentionLeases(final boolean expireLeases) { + if (expireLeases == false) { + return Tuple.tuple(false, retentionLeases); + } + assert primaryMode; + // the primary calculates the non-expired retention leases and syncs them to replicas + final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); + final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); + final Map> partitionByExpiration = retentionLeases + .leases() + .stream() + .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis)); + if (partitionByExpiration.get(true) == null) { + // early out as no retention leases have expired + return Tuple.tuple(false, retentionLeases); } - return Tuple.tuple(true, nonExpiredRetentionLeases); + final Collection nonExpiredLeases = + partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList(); + retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases); + return Tuple.tuple(true, retentionLeases); } /**