Skip to content

Commit

Permalink
Introduce retention lease syncing (#37398)
Browse files Browse the repository at this point in the history
This commit introduces retention lease syncing from the primary to its
replicas when a new retention lease is added. A follow-up commit will
add a background sync of the retention leases as well so that renewed
retention leases are synced to replicas.
  • Loading branch information
jasontedor committed Jan 27, 2019
1 parent cb13447 commit 5fddb63
Show file tree
Hide file tree
Showing 24 changed files with 855 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public synchronized void onSuccess(boolean forcedRefresh) {
/**
* Result of taking the action on the replica.
*/
protected static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>>
public static class WriteReplicaResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>>
extends ReplicaResult implements RespondingWriteResult {
public final Location location;
boolean finishedAsyncActions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -310,7 +311,11 @@ private long getAvgShardSizeInBytes() throws IOException {
}
}

public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardId> globalCheckpointSyncer) throws IOException {
public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
Expand Down Expand Up @@ -398,6 +403,7 @@ public synchronized IndexShard createShard(ShardRouting routing, Consumer<ShardI
searchOperationListeners,
indexingOperationListeners,
() -> globalCheckpointSyncer.accept(shardId),
(retentionLeases, listener) -> retentionLeaseSyncer.syncRetentionLeasesForShard(shardId, retentionLeases, listener),
circuitBreakerService);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -35,6 +37,7 @@
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -43,6 +46,7 @@
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -142,6 +146,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private final LongSupplier currentTimeMillisSupplier;

/**
* A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
* retention leases to replicas.
*/
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease;

/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
* current global checkpoint.
Expand All @@ -156,7 +166,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private final Map<String, RetentionLease> retentionLeases = new HashMap<>();

/**
* Get all non-expired retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned.
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned.
*
* @return the retention leases
*/
Expand All @@ -174,15 +184,60 @@ public synchronized Collection<RetentionLease> getRetentionLeases() {
}

/**
* Adds a new or updates an existing retention lease.
* Adds a new retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
* @param listener the callback when the retention lease is successfully added and synced to replicas
* @return the new retention lease
* @throws IllegalArgumentException if the specified retention lease already exists
*/
public RetentionLease addRetentionLease(
final String id,
final long retainingSequenceNumber,
final String source,
final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(listener);
final RetentionLease retentionLease;
final Collection<RetentionLease> currentRetentionLeases;
synchronized (this) {
assert primaryMode;
if (retentionLeases.containsKey(id)) {
throw new IllegalArgumentException("retention lease with ID [" + id + "] already exists");
}
retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
retentionLeases.put(id, retentionLease);
currentRetentionLeases = retentionLeases.values();
}
onNewRetentionLease.accept(Collections.unmodifiableCollection(new ArrayList<>(currentRetentionLeases)), listener);
return retentionLease;
}

/**
* Renews an existing retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
* @return the renewed retention lease
* @throws IllegalArgumentException if the specified retention lease does not exist
*/
public synchronized void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert primaryMode;
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source));
if (retentionLeases.containsKey(id) == false) {
throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist");
}
final RetentionLease retentionLease =
new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
final RetentionLease existingRetentionLease = retentionLeases.put(id, retentionLease);
assert existingRetentionLease != null;
assert existingRetentionLease.retainingSequenceNumber() <= retentionLease.retainingSequenceNumber() :
"retention lease renewal for [" + id + "]"
+ " from [" + source + "]"
+ " renewed a lower retaining sequence number [" + retentionLease.retainingSequenceNumber() + "]"
+ " than the current lease retaining sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]";
return retentionLease;
}

/**
Expand Down Expand Up @@ -440,18 +495,20 @@ private static long inSyncCheckpointStates(
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
* {@link SequenceNumbers#UNASSIGNED_SEQ_NO}.
*
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param shardId the shard ID
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onNewRetentionLease a callback when a new retention lease is created
*/
public ReplicationTracker(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier) {
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
Expand All @@ -462,6 +519,7 @@ public ReplicationTracker(
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
this.onNewRetentionLease = Objects.requireNonNull(onNewRetentionLease);
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.seqno;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collection;
import java.util.Objects;

/**
* Write action responsible for syncing retention leases to replicas. This action is deliberately a write action so that if a replica misses
* a retention lease sync then that shard will be marked as stale.
*/
public class RetentionLeaseSyncAction extends
TransportWriteAction<RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> {

public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync";

private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);

protected Logger getLogger() {
return LOGGER;
}

@Inject
public RetentionLeaseSyncAction(
final Settings settings,
final TransportService transportService,
final ClusterService clusterService,
final IndicesService indicesService,
final ThreadPool threadPool,
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(
settings,
ACTION_NAME,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
indexNameExpressionResolver,
RetentionLeaseSyncAction.Request::new,
RetentionLeaseSyncAction.Request::new,
ThreadPool.Names.MANAGEMENT);
}

/**
* Sync the specified retention leases for the specified shard. The callback is invoked when the sync succeeds or fails.
*
* @param shardId the shard to sync
* @param retentionLeases the retention leases to sync
* @param listener the callback to invoke when the sync completes normally or abnormally
*/
public void syncRetentionLeasesForShard(
final ShardId shardId,
final Collection<RetentionLease> retentionLeases,
final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(shardId);
Objects.requireNonNull(retentionLeases);
Objects.requireNonNull(listener);
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
execute(
new RetentionLeaseSyncAction.Request(shardId, retentionLeases),
ActionListener.wrap(
listener::onResponse,
e -> {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e);
}
listener.onFailure(e);
}));
}
}

@Override
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(final Request request, final IndexShard primary) {
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
// we flush to ensure that retention leases are committed
flush(primary);
return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger);
}

@Override
protected WriteReplicaResult<Request> shardOperationOnReplica(final Request request, final IndexShard replica) {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
// we flush to ensure that retention leases are committed
flush(replica);
return new WriteReplicaResult<>(request, null, null, replica, logger);
}

private void flush(final IndexShard indexShard) {
final FlushRequest flushRequest = new FlushRequest();
flushRequest.force(true);
flushRequest.waitIfOngoing(true);
indexShard.flush(flushRequest);
}

public static final class Request extends ReplicatedWriteRequest<Request> {

private Collection<RetentionLease> retentionLeases;

public Collection<RetentionLease> getRetentionLeases() {
return retentionLeases;
}

public Request() {

}

public Request(final ShardId shardId, final Collection<RetentionLease> retentionLeases) {
super(Objects.requireNonNull(shardId));
this.retentionLeases = Objects.requireNonNull(retentionLeases);
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
retentionLeases = in.readList(RetentionLease::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(Objects.requireNonNull(out));
out.writeCollection(retentionLeases);
}

@Override
public String toString() {
return "Request{" +
"retentionLeases=" + retentionLeases +
", shardId=" + shardId +
", timeout=" + timeout +
", index='" + index + '\'' +
", waitForActiveShards=" + waitForActiveShards +
'}';
}

}

public static final class Response extends ReplicationResponse implements WriteResponse {

@Override
public void setForcedRefresh(final boolean forcedRefresh) {
// ignore
}

}

@Override
protected Response newResponseInstance() {
return new Response();
}

}
Loading

0 comments on commit 5fddb63

Please sign in to comment.