diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 134d130fddcbe..279a616160000 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -189,7 +189,7 @@ public synchronized void onSuccess(boolean forcedRefresh) { /** * Result of taking the action on the replica. */ - protected static class WriteReplicaResult> + public static class WriteReplicaResult> extends ReplicaResult implements RespondingWriteResult { public final Location location; boolean finishedAsyncActions; diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 3eaad1eee5460..604d27a1e70c8 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexShard; @@ -310,7 +311,11 @@ private long getAvgShardSizeInBytes() throws IOException { } } - public synchronized IndexShard createShard(ShardRouting routing, Consumer globalCheckpointSyncer) throws IOException { + public synchronized IndexShard createShard( + final ShardRouting routing, + final Consumer globalCheckpointSyncer, + final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException { + Objects.requireNonNull(retentionLeaseSyncer); /* * TODO: we execute this in parallel but it's a synced method. Yet, we might * be able to serialize the execution via the cluster state in the future. for now we just @@ -398,6 +403,7 @@ public synchronized IndexShard createShard(ShardRouting routing, Consumer globalCheckpointSyncer.accept(shardId), + (retentionLeases, listener) -> retentionLeaseSyncer.syncRetentionLeasesForShard(shardId, retentionLeases, listener), circuitBreakerService); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index f309512ec98b6..85d5f6f62c61d 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -22,6 +22,8 @@ import com.carrotsearch.hppc.ObjectLongHashMap; import com.carrotsearch.hppc.ObjectLongMap; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -35,6 +37,7 @@ import org.elasticsearch.index.shard.ShardId; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -43,6 +46,7 @@ import java.util.Objects; import java.util.OptionalLong; import java.util.Set; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -142,6 +146,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ private final LongSupplier currentTimeMillisSupplier; + /** + * A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync + * retention leases to replicas. + */ + private final BiConsumer, ActionListener> onNewRetentionLease; + /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the * current global checkpoint. @@ -156,7 +166,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private final Map retentionLeases = new HashMap<>(); /** - * Get all non-expired retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned. + * Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. * * @return the retention leases */ @@ -174,15 +184,60 @@ public synchronized Collection getRetentionLeases() { } /** - * Adds a new or updates an existing retention lease. + * Adds a new retention lease. + * + * @param id the identifier of the retention lease + * @param retainingSequenceNumber the retaining sequence number + * @param source the source of the retention lease + * @param listener the callback when the retention lease is successfully added and synced to replicas + * @return the new retention lease + * @throws IllegalArgumentException if the specified retention lease already exists + */ + public RetentionLease addRetentionLease( + final String id, + final long retainingSequenceNumber, + final String source, + final ActionListener listener) { + Objects.requireNonNull(listener); + final RetentionLease retentionLease; + final Collection currentRetentionLeases; + synchronized (this) { + assert primaryMode; + if (retentionLeases.containsKey(id)) { + throw new IllegalArgumentException("retention lease with ID [" + id + "] already exists"); + } + retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source); + retentionLeases.put(id, retentionLease); + currentRetentionLeases = retentionLeases.values(); + } + onNewRetentionLease.accept(Collections.unmodifiableCollection(new ArrayList<>(currentRetentionLeases)), listener); + return retentionLease; + } + + /** + * Renews an existing retention lease. * * @param id the identifier of the retention lease * @param retainingSequenceNumber the retaining sequence number * @param source the source of the retention lease + * @return the renewed retention lease + * @throws IllegalArgumentException if the specified retention lease does not exist */ - public synchronized void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) { + public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) { assert primaryMode; - retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source)); + if (retentionLeases.containsKey(id) == false) { + throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist"); + } + final RetentionLease retentionLease = + new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source); + final RetentionLease existingRetentionLease = retentionLeases.put(id, retentionLease); + assert existingRetentionLease != null; + assert existingRetentionLease.retainingSequenceNumber() <= retentionLease.retainingSequenceNumber() : + "retention lease renewal for [" + id + "]" + + " from [" + source + "]" + + " renewed a lower retaining sequence number [" + retentionLease.retainingSequenceNumber() + "]" + + " than the current lease retaining sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]"; + return retentionLease; } /** @@ -440,10 +495,11 @@ private static long inSyncCheckpointStates( * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. * - * @param shardId the shard ID - * @param allocationId the allocation ID - * @param indexSettings the index settings - * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} + * @param shardId the shard ID + * @param allocationId the allocation ID + * @param indexSettings the index settings + * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} + * @param onNewRetentionLease a callback when a new retention lease is created */ public ReplicationTracker( final ShardId shardId, @@ -451,7 +507,8 @@ public ReplicationTracker( final IndexSettings indexSettings, final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, - final LongSupplier currentTimeMillisSupplier) { + final LongSupplier currentTimeMillisSupplier, + final BiConsumer, ActionListener> onNewRetentionLease) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -462,6 +519,7 @@ public ReplicationTracker( checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated); this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier); + this.onNewRetentionLease = Objects.requireNonNull(onNewRetentionLease); this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java new file mode 100644 index 0000000000000..3b7df41f72d05 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; + +/** + * Write action responsible for syncing retention leases to replicas. This action is deliberately a write action so that if a replica misses + * a retention lease sync then that shard will be marked as stale. + */ +public class RetentionLeaseSyncAction extends + TransportWriteAction { + + public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync"; + + private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class); + + protected Logger getLogger() { + return LOGGER; + } + + @Inject + public RetentionLeaseSyncAction( + final Settings settings, + final TransportService transportService, + final ClusterService clusterService, + final IndicesService indicesService, + final ThreadPool threadPool, + final ShardStateAction shardStateAction, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver) { + super( + settings, + ACTION_NAME, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + indexNameExpressionResolver, + RetentionLeaseSyncAction.Request::new, + RetentionLeaseSyncAction.Request::new, + ThreadPool.Names.MANAGEMENT); + } + + /** + * Sync the specified retention leases for the specified shard. The callback is invoked when the sync succeeds or fails. + * + * @param shardId the shard to sync + * @param retentionLeases the retention leases to sync + * @param listener the callback to invoke when the sync completes normally or abnormally + */ + public void syncRetentionLeasesForShard( + final ShardId shardId, + final Collection retentionLeases, + final ActionListener listener) { + Objects.requireNonNull(shardId); + Objects.requireNonNull(retentionLeases); + Objects.requireNonNull(listener); + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we have to execute under the system context so that if security is enabled the sync is authorized + threadContext.markAsSystemContext(); + execute( + new RetentionLeaseSyncAction.Request(shardId, retentionLeases), + ActionListener.wrap( + listener::onResponse, + e -> { + if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e); + } + listener.onFailure(e); + })); + } + } + + @Override + protected WritePrimaryResult shardOperationOnPrimary(final Request request, final IndexShard primary) { + Objects.requireNonNull(request); + Objects.requireNonNull(primary); + // we flush to ensure that retention leases are committed + flush(primary); + return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger); + } + + @Override + protected WriteReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) { + Objects.requireNonNull(request); + Objects.requireNonNull(replica); + replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); + // we flush to ensure that retention leases are committed + flush(replica); + return new WriteReplicaResult<>(request, null, null, replica, logger); + } + + private void flush(final IndexShard indexShard) { + final FlushRequest flushRequest = new FlushRequest(); + flushRequest.force(true); + flushRequest.waitIfOngoing(true); + indexShard.flush(flushRequest); + } + + public static final class Request extends ReplicatedWriteRequest { + + private Collection retentionLeases; + + public Collection getRetentionLeases() { + return retentionLeases; + } + + public Request() { + + } + + public Request(final ShardId shardId, final Collection retentionLeases) { + super(Objects.requireNonNull(shardId)); + this.retentionLeases = Objects.requireNonNull(retentionLeases); + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + retentionLeases = in.readList(RetentionLease::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(Objects.requireNonNull(out)); + out.writeCollection(retentionLeases); + } + + @Override + public String toString() { + return "Request{" + + "retentionLeases=" + retentionLeases + + ", shardId=" + shardId + + ", timeout=" + timeout + + ", index='" + index + '\'' + + ", waitForActiveShards=" + waitForActiveShards + + '}'; + } + + } + + public static final class Response extends ReplicationResponse implements WriteResponse { + + @Override + public void setForcedRefresh(final boolean forcedRefresh) { + // ignore + } + + } + + @Override + protected Response newResponseInstance() { + return new Response(); + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java new file mode 100644 index 0000000000000..1e276eb98adaf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncer.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.index.shard.ShardId; + +import java.util.Collection; + +/** + * A functional interface that represents a method for syncing retention leases to replica shards after a new retention lease is added on + * the primary. + */ +@FunctionalInterface +public interface RetentionLeaseSyncer { + + /** + * Represents a method that when invoked syncs retention leases to replica shards after a new retention lease is added on the primary. + * The specified listener is invoked when the syncing completes with success or failure. + * + * @param shardId the shard ID + * @param retentionLeases the retention leases to sync + * @param listener the callback when sync completes + */ + void syncRetentionLeasesForShard( + ShardId shardId, + Collection retentionLeases, + ActionListener listener); + +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f92eb38349246..dc43d42c94a5c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -40,6 +40,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -266,6 +267,7 @@ public IndexShard( final List searchOperationListener, final List listeners, final Runnable globalCheckpointSyncer, + final BiConsumer, ActionListener> retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -313,7 +315,8 @@ public IndexShard( indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated, - threadPool::absoluteTimeInMillis); + threadPool::absoluteTimeInMillis, + retentionLeaseSyncer); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis @@ -1882,18 +1885,61 @@ public void addGlobalCheckpointListener( this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout); } + /** + * Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. + * + * @return the retention leases + */ + public Collection getRetentionLeases() { + verifyNotClosed(); + return replicationTracker.getRetentionLeases(); + } /** - * Adds a new or updates an existing retention lease. + * Adds a new retention lease. * * @param id the identifier of the retention lease * @param retainingSequenceNumber the retaining sequence number * @param source the source of the retention lease + * @param listener the callback when the retention lease is successfully added and synced to replicas + * @return the new retention lease + * @throws IllegalArgumentException if the specified retention lease already exists + */ + public RetentionLease addRetentionLease( + final String id, + final long retainingSequenceNumber, + final String source, + final ActionListener listener) { + Objects.requireNonNull(listener); + assert assertPrimaryMode(); + verifyNotClosed(); + return replicationTracker.addRetentionLease(id, retainingSequenceNumber, source, listener); + } + + /** + * Renews an existing retention lease. + * + * @param id the identifier of the retention lease + * @param retainingSequenceNumber the retaining sequence number + * @param source the source of the retention lease + * @return the renewed retention lease + * @throws IllegalArgumentException if the specified retention lease does not exist */ - void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) { + public RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) { assert assertPrimaryMode(); verifyNotClosed(); - replicationTracker.addOrUpdateRetentionLease(id, retainingSequenceNumber, source); + return replicationTracker.renewRetentionLease(id, retainingSequenceNumber, source); + } + + /** + * Updates retention leases on a replica. + * + * @param retentionLeases the retention leases + */ + public void updateRetentionLeasesOnReplica(final Collection retentionLeases) { + assert assertReplicationTarget(); + verifyNotClosed(); + replicationTracker.updateRetentionLeasesOnReplica(retentionLeases); } /** diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index b881ba73a28e6..fa42776403dca 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -94,6 +94,7 @@ import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexEventListener; @@ -592,10 +593,12 @@ public IndexShard createShard( final PeerRecoveryTargetService.RecoveryListener recoveryListener, final RepositoriesService repositoriesService, final Consumer onShardFailure, - final Consumer globalCheckpointSyncer) throws IOException { + final Consumer globalCheckpointSyncer, + final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException { + Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); - IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, (type, mapping) -> { diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index c8afe92be8d37..80ac05ece8274 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -56,6 +56,8 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRelocatedException; @@ -83,6 +85,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -121,6 +124,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final List buildInIndexListener; private final PrimaryReplicaSyncer primaryReplicaSyncer; private final Consumer globalCheckpointSyncer; + private final RetentionLeaseSyncer retentionLeaseSyncer; @Inject public IndicesClusterStateService( @@ -137,7 +141,8 @@ public IndicesClusterStateService( final PeerRecoverySourceService peerRecoverySourceService, final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, - final GlobalCheckpointSyncAction globalCheckpointSyncAction) { + final GlobalCheckpointSyncAction globalCheckpointSyncAction, + final RetentionLeaseSyncAction retentionLeaseSyncAction) { this( settings, (AllocatedIndices>) indicesService, @@ -152,7 +157,8 @@ public IndicesClusterStateService( peerRecoverySourceService, snapshotShardsService, primaryReplicaSyncer, - globalCheckpointSyncAction::updateGlobalCheckpointForShard); + globalCheckpointSyncAction::updateGlobalCheckpointForShard, + Objects.requireNonNull(retentionLeaseSyncAction)::syncRetentionLeasesForShard); } // for tests @@ -170,7 +176,8 @@ public IndicesClusterStateService( final PeerRecoverySourceService peerRecoverySourceService, final SnapshotShardsService snapshotShardsService, final PrimaryReplicaSyncer primaryReplicaSyncer, - final Consumer globalCheckpointSyncer) { + final Consumer globalCheckpointSyncer, + final RetentionLeaseSyncer retentionLeaseSyncer) { this.settings = settings; this.buildInIndexListener = Arrays.asList( @@ -188,6 +195,7 @@ public IndicesClusterStateService( this.repositoriesService = repositoriesService; this.primaryReplicaSyncer = primaryReplicaSyncer; this.globalCheckpointSyncer = globalCheckpointSyncer; + this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); } @@ -576,7 +584,8 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR new RecoveryListener(shardRouting), repositoriesService, failedShardHandler, - globalCheckpointSyncer); + globalCheckpointSyncer, + retentionLeaseSyncer); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); } @@ -870,6 +879,7 @@ U createIndex(IndexMetaData indexMetaData, * @param repositoriesService service responsible for snapshot/restore * @param onShardFailure a callback when this shard fails * @param globalCheckpointSyncer a callback when this shard syncs the global checkpoint + * @param retentionLeaseSyncer a callback when this shard syncs retention leases * @return a new shard * @throws IOException if an I/O exception occurs when creating the shard */ @@ -880,7 +890,8 @@ T createShard( PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService, Consumer onShardFailure, - Consumer globalCheckpointSyncer) throws IOException; + Consumer globalCheckpointSyncer, + RetentionLeaseSyncer retentionLeaseSyncer) throws IOException; /** * Returns shard for the specified id if it exists otherwise returns null. diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 2854cc87d8695..3dafb93d65400 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.seqno; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -30,8 +31,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; +import java.util.stream.Collectors; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.equalTo; @@ -41,39 +45,83 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTestCase { - public void testAddOrUpdateRetentionLease() { - final AllocationId id = AllocationId.newInitializing(); + public void testAddOrRenewRetentionLease() { + final AllocationId allocationId = AllocationId.newInitializing(); final ReplicationTracker replicationTracker = new ReplicationTracker( new ShardId("test", "_na", 0), - id.getId(), + allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), UNASSIGNED_SEQ_NO, value -> {}, - () -> 0L); + () -> 0L, + (leases, listener) -> {}); replicationTracker.updateFromMaster( randomNonNegativeLong(), - Collections.singleton(id.getId()), - routingTable(Collections.emptySet(), id), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId), Collections.emptySet()); replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); final int length = randomIntBetween(0, 8); final long[] minimumRetainingSequenceNumbers = new long[length]; for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); + replicationTracker.addRetentionLease( + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L); } for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); - replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); + replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L); } + } + + public void testOnNewRetentionLease() { + final AllocationId allocationId = AllocationId.newInitializing(); + final Map retentionLeases = new HashMap<>(); + final AtomicBoolean invoked = new AtomicBoolean(); + final AtomicReference reference = new AtomicReference<>(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + UNASSIGNED_SEQ_NO, + value -> {}, + () -> 0L, + (leases, listener) -> { + // we do not want to hold a lock on the replication tracker in the callback! + assertFalse(Thread.holdsLock(reference.get())); + invoked.set(true); + assertThat( + leases.stream().collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)), + equalTo(retentionLeases)); + }); + reference.set(replicationTracker); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId), + Collections.emptySet()); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + final int length = randomIntBetween(0, 8); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + retentionLeases.put(id, retainingSequenceNumber); + replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {})); + // assert that the new retention lease callback was invoked + assertTrue(invoked.get()); + // reset the invocation marker so that we can assert the callback was not invoked when renewing the lease + invoked.set(false); + replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test"); + assertFalse(invoked.get()); + } } public void testExpiration() { - final AllocationId id = AllocationId.newInitializing(); + final AllocationId allocationId = AllocationId.newInitializing(); final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024)); final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); final Settings settings = Settings @@ -82,20 +130,21 @@ public void testExpiration() { .build(); final ReplicationTracker replicationTracker = new ReplicationTracker( new ShardId("test", "_na", 0), - id.getId(), + allocationId.getId(), IndexSettingsModule.newIndexSettings("test", settings), UNASSIGNED_SEQ_NO, value -> {}, - currentTimeMillis::get); + currentTimeMillis::get, + (leases, listener) -> {}); replicationTracker.updateFromMaster( randomNonNegativeLong(), - Collections.singleton(id.getId()), - routingTable(Collections.emptySet(), id), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId), Collections.emptySet()); replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - replicationTracker.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); { final Collection retentionLeases = replicationTracker.getRetentionLeases(); @@ -108,7 +157,7 @@ public void testExpiration() { // renew the lease currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024)); retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE); - replicationTracker.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); { final Collection retentionLeases = replicationTracker.getRetentionLeases(); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java index 9b1f951a030fe..a36006a5fc4c1 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java @@ -47,7 +47,8 @@ ReplicationTracker newTracker( IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), UNASSIGNED_SEQ_NO, updatedGlobalCheckpoint, - currentTimeMillisSupplier); + currentTimeMillisSupplier, + (leases, listener) -> {}); } static IndexShardRoutingTable routingTable(final Set initializingIds, final AllocationId primaryId) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index 001e50af57c79..b61e3f647b9d2 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.index.seqno; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -47,6 +49,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.stream.Collectors; @@ -683,10 +686,12 @@ public void testPrimaryContextHandoff() throws IOException { final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId(); final LongConsumer onUpdate = updatedGlobalCheckpoint -> {}; final long globalCheckpoint = UNASSIGNED_SEQ_NO; + final BiConsumer, ActionListener> onNewRetentionLease = + (leases, listener) -> {}; ReplicationTracker oldPrimary = new ReplicationTracker( - shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L); + shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); ReplicationTracker newPrimary = new ReplicationTracker( - shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L); + shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java new file mode 100644 index 0000000000000..898309524b4f0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.elasticsearch.index.seqno; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.mockito.ArgumentCaptor; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; +import static org.elasticsearch.mock.orig.Mockito.when; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class RetentionLeaseSyncActionTests extends ESTestCase { + + private ThreadPool threadPool; + private CapturingTransport transport; + private ClusterService clusterService; + private TransportService transportService; + private ShardStateAction shardStateAction; + + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(getClass().getName()); + transport = new CapturingTransport(); + clusterService = createClusterService(threadPool); + transportService = transport.createTransportService(clusterService.getSettings(), threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> clusterService.localNode(), null, Collections.emptySet()); + transportService.start(); + transportService.acceptIncomingRequests(); + shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); + } + + public void tearDown() throws Exception { + try { + IOUtils.close(transportService, clusterService, transport); + } finally { + terminate(threadPool); + } + super.tearDown(); + } + + public void testRetentionLeaseSyncActionOnPrimary() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver()); + @SuppressWarnings("unchecked") final Collection retentionLeases = + (Collection) mock(Collection.class); + final RetentionLeaseSyncAction.Request request = + new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); + + final TransportWriteAction.WritePrimaryResult result = + action.shardOperationOnPrimary(request, indexShard); + // the retention leases on the shard should be flushed + final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); + verify(indexShard).flush(flushRequest.capture()); + assertTrue(flushRequest.getValue().force()); + assertTrue(flushRequest.getValue().waitIfOngoing()); + // we should forward the request containing the current retention leases to the replica + assertThat(result.replicaRequest(), sameInstance(request)); + // we should start with an empty replication response + assertNull(result.finalResponseIfSuccessful.getShardInfo()); + } + + public void testRetentionLeaseSyncActionOnReplica() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver()); + @SuppressWarnings("unchecked") final Collection retentionLeases = + (Collection) mock(Collection.class); + final RetentionLeaseSyncAction.Request request = + new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); + + final TransportWriteAction.WriteReplicaResult result = action.shardOperationOnReplica(request, indexShard); + // the retention leases on the shard should be updated + verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); + // the retention leases on the shard should be flushed + final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); + verify(indexShard).flush(flushRequest.capture()); + assertTrue(flushRequest.getValue().force()); + assertTrue(flushRequest.getValue().waitIfOngoing()); + // the result should indicate success + final AtomicBoolean success = new AtomicBoolean(); + result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString()))); + assertTrue(success.get()); + } + + public void testRetentionLeaseSyncExecution() { + final IndicesService indicesService = mock(IndicesService.class); + + final Index index = new Index("index", "uuid"); + final IndexService indexService = mock(IndexService.class); + when(indicesService.indexServiceSafe(index)).thenReturn(indexService); + + final int id = randomIntBetween(0, 4); + final IndexShard indexShard = mock(IndexShard.class); + when(indexService.getShard(id)).thenReturn(indexShard); + + final ShardId shardId = new ShardId(index, id); + when(indexShard.shardId()).thenReturn(shardId); + + final Logger retentionLeaseSyncActionLogger = mock(Logger.class); + + @SuppressWarnings("unchecked") final Collection retentionLeases = + (Collection) mock(Collection.class); + final AtomicBoolean invoked = new AtomicBoolean(); + final RetentionLeaseSyncAction action = new RetentionLeaseSyncAction( + Settings.EMPTY, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver()) { + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + assertTrue(threadPool.getThreadContext().isSystemContext()); + assertThat(request.shardId(), sameInstance(indexShard.shardId())); + assertThat(request.getRetentionLeases(), sameInstance(retentionLeases)); + if (randomBoolean()) { + listener.onResponse(new Response()); + } else { + final Exception e = randomFrom( + new AlreadyClosedException("closed"), + new IndexShardClosedException(indexShard.shardId()), + new RuntimeException("failed")); + listener.onFailure(e); + if (e instanceof AlreadyClosedException == false && e instanceof IndexShardClosedException == false) { + final ArgumentCaptor captor = ArgumentCaptor.forClass(ParameterizedMessage.class); + verify(retentionLeaseSyncActionLogger).warn(captor.capture(), same(e)); + final ParameterizedMessage message = captor.getValue(); + assertThat(message.getFormat(), equalTo("{} retention lease sync failed")); + assertThat(message.getParameters(), arrayContaining(indexShard.shardId())); + } + verifyNoMoreInteractions(retentionLeaseSyncActionLogger); + } + invoked.set(true); + } + + @Override + protected Logger getLogger() { + return retentionLeaseSyncActionLogger; + } + }; + + // execution happens on the test thread, so no need to register an actual listener to callback + action.syncRetentionLeasesForShard(indexShard.shardId(), retentionLeases, ActionListener.wrap(() -> {})); + assertTrue(invoked.get()); + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java new file mode 100644 index 0000000000000..fad9e25db12d6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; + +public class RetentionLeaseSyncIT extends ESIntegTestCase { + + public void testRetentionLeasesSyncedOnAdd() throws Exception { + final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); + internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", numberOfReplicas) + .build(); + createIndex("index", settings); + ensureGreen("index"); + final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); + final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = internalCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + // we will add multiple retention leases and expect to see them synced to all replicas + final int length = randomIntBetween(1, 8); + final Map currentRetentionLeases = new HashMap<>(); + for (int i = 0; i < length; i++) { + final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8)); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); + latch.await(); + + // check retention leases have been committed on the primary + final Collection primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases( + primary.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES)); + assertThat(currentRetentionLeases, equalTo(toMap(primaryCommittedRetentionLeases))); + + // check current retention leases have been synced to all replicas + for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { + final String replicaShardNodeId = replicaShard.currentNodeId(); + final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName(); + final IndexShard replica = internalCluster() + .getInstance(IndicesService.class, replicaShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + final Map retentionLeasesOnReplica = toMap(replica.getRetentionLeases()); + assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); + + // check retention leases have been committed on the replica + final Collection replicaCommittedRetentionLeases = RetentionLease.decodeRetentionLeases( + replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES)); + assertThat(currentRetentionLeases, equalTo(toMap(replicaCommittedRetentionLeases))); + } + } + } + + private static Map toMap(final Collection replicaCommittedRetentionLeases) { + return replicaCommittedRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())); + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 58c4844a11bfc..36560dd96c627 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -663,6 +663,7 @@ public static final IndexShard newIndexShard( Collections.emptyList(), Arrays.asList(listeners), () -> {}, + (leases, listener) -> {}, cbs); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index d0018a0a864f8..eff1edfed52ba 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.shard; import org.apache.lucene.index.SegmentInfos; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRoutingHelper; @@ -71,20 +72,21 @@ protected void tearDownThreadPool() { } - public void testAddOrUpdateRetentionLease() throws IOException { + public void testAddOrRenewRetentionLease() throws IOException { final IndexShard indexShard = newStartedShard(true); try { final int length = randomIntBetween(0, 8); final long[] minimumRetainingSequenceNumbers = new long[length]; for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); + indexShard.addRetentionLease( + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L); } for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); - indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); + indexShard.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L); } } finally { @@ -103,7 +105,7 @@ public void testExpiration() throws IOException { try { final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - indexShard.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); { final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); @@ -116,7 +118,7 @@ public void testExpiration() throws IOException { // renew the lease currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024)); retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE); - indexShard.addOrUpdateRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + indexShard.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); { final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); @@ -150,7 +152,8 @@ public void testCommit() throws IOException { for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(randomNonNegativeLong())); - indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); + indexShard.addRetentionLease( + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); } currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE)); diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 769cdfc8a9b53..d240bb01fefb1 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -130,7 +130,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting = newRouting.moveToUnassigned(unassignedInfo) .updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE); newRouting = ShardRoutingHelper.initialize(newRouting, nodeId); - IndexShard shard = index.createShard(newRouting, s -> {}); + IndexShard shard = index.createShard(newRouting, s -> {}, (s, leases, listener) -> {}); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index b4c3d65115155..f248d46b11744 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -31,6 +31,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.RetentionLeaseSyncer; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; @@ -233,7 +234,8 @@ public MockIndexShard createShard( final PeerRecoveryTargetService.RecoveryListener recoveryListener, final RepositoriesService repositoriesService, final Consumer onShardFailure, - final Consumer globalCheckpointSyncer) throws IOException { + final Consumer globalCheckpointSyncer, + final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException { failRandomly(); MockIndexService indexService = indexService(recoveryState.getShardId().getIndex()); MockIndexShard indexShard = indexService.createShard(shardRouting); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 9dd8d5c5b660d..b400b56b34d55 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -480,7 +480,8 @@ private IndicesClusterStateService createIndicesClusterStateService(DiscoveryNod null, null, primaryReplicaSyncer, - s -> {}); + s -> {}, + (s, leases, listener) -> {}); } private class RecordingIndicesService extends MockIndicesService { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index 24f97b67c1458..8b750939238cb 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -82,6 +82,7 @@ import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; +import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -538,6 +539,15 @@ protected void assertSnapshotOrGenericThread() { actionFilters, indexNameExpressionResolver)), new GlobalCheckpointSyncAction( + settings, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + indexNameExpressionResolver), + new RetentionLeaseSyncAction( settings, transportService, clusterService, diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 1e3dbef92c30a..35667b0f87a1c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -642,7 +642,7 @@ public EngineConfig config( final CircuitBreakerService breakerService) { final IndexWriterConfig iwc = newIndexWriterConfig(); final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); - final Engine.EventListener listener = new Engine.EventListener() {}; // we don't need to notify anybody in this test + final Engine.EventListener eventListener = new Engine.EventListener() {}; // we don't need to notify anybody in this test final List extRefreshListenerList = externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); final List intRefreshListenerList = @@ -652,7 +652,13 @@ public EngineConfig config( if (maybeGlobalCheckpointSupplier == null) { assert maybeRetentionLeasesSupplier == null; final ReplicationTracker replicationTracker = new ReplicationTracker( - shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L); + shardId, + allocationId.getId(), + indexSettings, + SequenceNumbers.NO_OPS_PERFORMED, + update -> {}, + () -> 0L, + (leases, listener) -> {}); globalCheckpointSupplier = replicationTracker; retentionLeasesSupplier = replicationTracker::getRetentionLeases; } else { @@ -671,7 +677,7 @@ public EngineConfig config( iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), - listener, + eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 10c9f399d4cbb..acb4911f9c69d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -385,6 +385,7 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe Collections.emptyList(), Arrays.asList(listeners), globalCheckpointSyncer, + (leases, listener) -> {}, breakerService); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java index ca8318212c9ee..4deff6870144c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java @@ -25,6 +25,7 @@ public final class SystemPrivilege extends Privilege { "indices:admin/template/put", // needed for the TemplateUpgradeService "indices:admin/template/delete", // needed for the TemplateUpgradeService "indices:admin/seq_no/global_checkpoint_sync*", // needed for global checkpoint syncs + "indices:admin/seq_no/retention_lease_sync*", // needed for retention lease syncs "indices:admin/settings/update" // needed for DiskThresholdMonitor.markIndicesReadOnly ); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java index 1484e7a878141..77be9f3b1b1f3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/privilege/PrivilegeTests.java @@ -127,6 +127,9 @@ public void testSystem() throws Exception { assertThat(predicate.test("indices:admin/seq_no/global_checkpoint_sync"), is(true)); assertThat(predicate.test("indices:admin/seq_no/global_checkpoint_sync[p]"), is(true)); assertThat(predicate.test("indices:admin/seq_no/global_checkpoint_sync[r]"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/retention_lease_sync"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/retention_lease_sync[p]"), is(true)); + assertThat(predicate.test("indices:admin/seq_no/retention_lease_sync[r]"), is(true)); assertThat(predicate.test("indices:admin/settings/update"), is(true)); assertThat(predicate.test("indices:admin/settings/foo"), is(false)); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index 2a9d832f0f012..dccc8f3ce587a 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -247,8 +247,15 @@ public void testActionsForSystemUserIsAuthorized() { // A failure would throw an exception final Authentication authentication = createAuthentication(SystemUser.INSTANCE); - final String[] actions = { "indices:monitor/whatever", "internal:whatever", "cluster:monitor/whatever", "cluster:admin/reroute", - "indices:admin/mapping/put", "indices:admin/template/put", "indices:admin/seq_no/global_checkpoint_sync", + final String[] actions = { + "indices:monitor/whatever", + "internal:whatever", + "cluster:monitor/whatever", + "cluster:admin/reroute", + "indices:admin/mapping/put", + "indices:admin/template/put", + "indices:admin/seq_no/global_checkpoint_sync", + "indices:admin/seq_no/retention_lease_sync", "indices:admin/settings/update" }; for (String action : actions) { authorize(authentication, action, request);