Skip to content

Commit

Permalink
Block older operations on primary term transition
Browse files Browse the repository at this point in the history
Today a replica learns of a new primary term via a cluster state update
and there is not a clean transition between the older primary term and
the newer primary term. This commit modifies this situation so that:
 - a replica shard learns of a new primary term via replication
   operations executed under the mandate of the new primary
 - when a replica shard learns of a new primary term, it blocks
   operations on older terms from reaching the engine, with a clear
   transition point between the operations on the older term and the
   operations on the newer term

This work paves the way for a primary/replica sync on primary
promotion. Future work will also ensure a clean transition point on a
promoted primary, and prepare a replica shard for a sync with the
promoted primary.
  • Loading branch information
jasontedor committed May 18, 2017
1 parent 1196dfb commit bf5ab75
Show file tree
Hide file tree
Showing 13 changed files with 323 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
Expand All @@ -52,7 +51,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -180,7 +178,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima

/**
* Synchronous replica operation on nodes with replica copies. This is done under the lock form
* {@link IndexShard#acquireReplicaOperationLock(long, ActionListener, String)}
* {@link IndexShard#acquireReplicaOperationPermit(long, ActionListener, String)}
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
Expand Down Expand Up @@ -584,7 +582,7 @@ protected void doRun() throws Exception {
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
replica.acquireReplicaOperationLock(request.primaryTerm, this, executor);
replica.acquireReplicaOperationPermit(request.primaryTerm, this, executor);
}

/**
Expand Down Expand Up @@ -921,7 +919,7 @@ public void onFailure(Exception e) {
}
};

indexShard.acquirePrimaryOperationLock(onAcquired, executor);
indexShard.acquirePrimaryOperationPermit(onAcquired, executor);
}

class ShardReference implements Releasable {
Expand Down
5 changes: 0 additions & 5 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,6 @@ public synchronized void updateMetaData(final IndexMetaData metadata) {
rescheduleFsyncTask(durability);
}
}

// update primary terms
for (final IndexShard shard : this.shards.values()) {
shard.updatePrimaryTerm(metadata.primaryTerm(shard.shardId().id()));
}
}

private void rescheduleFsyncTask(Translog.Durability durability) {
Expand Down
107 changes: 82 additions & 25 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -131,6 +130,7 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -197,7 +197,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl

private final ShardPath path;

private final IndexShardOperationsLock indexShardOperationsLock;
private final IndexShardOperationsPermit indexShardOperationsPermit;

private static final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
// for primaries, we only allow to write when actually started (so the cluster has decided we started)
Expand Down Expand Up @@ -274,7 +274,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP
}
this.cachingPolicy = cachingPolicy;
}
indexShardOperationsLock = new IndexShardOperationsLock(shardId, logger, threadPool);
indexShardOperationsPermit = new IndexShardOperationsPermit(shardId, logger, threadPool);
searcherWrapper = indexSearcherWrapper;
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
refreshListeners = buildRefreshListeners();
Expand Down Expand Up @@ -330,7 +330,6 @@ public ShardFieldData fieldData() {
return this.shardFieldData;
}


/**
* Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
*/
Expand All @@ -342,6 +341,7 @@ public long getPrimaryTerm() {
* notifies the shard of an increase in the primary term
*/
public void updatePrimaryTerm(final long newTerm) {
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
synchronized (mutex) {
if (newTerm != primaryTerm) {
// Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
Expand All @@ -356,10 +356,13 @@ public void updatePrimaryTerm(final long newTerm) {
//
// We could fail the shard in that case, but this will cause it to be removed from the insync allocations list
// potentially preventing re-allocation.
assert shardRouting.primary() == false || shardRouting.initializing() == false :
"a started primary shard should never update it's term. shard: " + shardRouting
+ " current term [" + primaryTerm + "] new term [" + newTerm + "]";
assert newTerm > primaryTerm : "primary terms can only go up. current [" + primaryTerm + "], new [" + newTerm + "]";
assert shardRouting.initializing() == false :
"a started primary shard should never update its term; "
+ "shard " + shardRouting + ", "
+ "current term [" + primaryTerm + "], "
+ "new term [" + newTerm + "]";
assert newTerm > primaryTerm :
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]";
primaryTerm = newTerm;
}
}
Expand Down Expand Up @@ -459,9 +462,9 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
indexShardOperationsLock.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation locks are being held here, move state from started to relocated
assert indexShardOperationsLock.getActiveOperationsCount() == 0 :
indexShardOperationsPermit.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationsPermit.getActiveOperationsCount() == 0 :
"in-flight operations in progress while moving shard state to relocated";
synchronized (mutex) {
if (state != IndexShardState.STARTED) {
Expand Down Expand Up @@ -978,7 +981,7 @@ public void close(String reason, boolean flushEngine) throws IOException {
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
// Also closing refreshListeners to prevent us from accumulating any more listeners
IOUtils.close(engine, refreshListeners);
indexShardOperationsLock.close();
indexShardOperationsPermit.close();
}
}
}
Expand Down Expand Up @@ -1845,35 +1848,89 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
}

/**
* Acquire a primary operation lock whenever the shard is ready for indexing. If the lock is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, lock acquisition can be delayed. The provided
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
* ActionListener will then be called using the provided executor.
*/
public void acquirePrimaryOperationLock(ActionListener<Releasable> onLockAcquired, String executorOnDelay) {
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay) {
verifyNotClosed();
verifyPrimary();

indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, false);
indexShardOperationsPermit.acquire(onPermitAcquired, executorOnDelay, false);
}

private final AtomicLong pendingPrimaryTerm = new AtomicLong();

/**
* Acquire a replica operation lock whenever the shard is ready for indexing (see acquirePrimaryOperationLock). If the given primary
* term is lower then the one in {@link #shardRouting} an {@link IllegalArgumentException} is thrown.
* Acquire a replica operation permit whenever the shard is ready for indexing (see
* {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
* name.
*
* @param operationPrimaryTerm the operation primary term
* @param onPermitAcquired the listener for permit acquisition
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
*/
public void acquireReplicaOperationLock(long opPrimaryTerm, ActionListener<Releasable> onLockAcquired, String executorOnDelay) {
public void acquireReplicaOperationPermit(
final long operationPrimaryTerm, final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
verifyNotClosed();
verifyReplicationTarget();
if (primaryTerm > opPrimaryTerm) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])",
shardId, opPrimaryTerm, primaryTerm));
if (operationPrimaryTerm > primaryTerm
&& pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) {
try {
indexShardOperationsPermit.blockOperations(30, TimeUnit.MINUTES, () -> {
if (operationPrimaryTerm > primaryTerm) {
primaryTerm = operationPrimaryTerm;
}
});
} catch (final InterruptedException | TimeoutException e) {
onPermitAcquired.onFailure(e);
}
}

indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, true);
final long currentPrimaryTerm = primaryTerm;
if (operationPrimaryTerm == currentPrimaryTerm) {
indexShardOperationsPermit.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
if (operationPrimaryTerm < primaryTerm) {
releasable.close();
onOperationPrimaryTermIsTooOld(shardId, operationPrimaryTerm, primaryTerm, onPermitAcquired);
} else if (operationPrimaryTerm == primaryTerm) {
onPermitAcquired.onResponse(releasable);
}
}

@Override
public void onFailure(final Exception e) {
onPermitAcquired.onFailure(e);
}
},
executorOnDelay,
true);
} else {
onOperationPrimaryTermIsTooOld(shardId, operationPrimaryTerm, currentPrimaryTerm, onPermitAcquired);
}
}

private static void onOperationPrimaryTermIsTooOld(
final ShardId shardId,
final long operationPrimaryTerm,
final long primaryTerm,
final ActionListener<Releasable> onPermitAcquired) {
final String message = String.format(
Locale.ROOT,
"%s operation primary term [%d] is too old (current [%d])",
shardId,
operationPrimaryTerm,
primaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
}

public int getActiveOperationsCount() {
return indexShardOperationsLock.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close
return indexShardOperationsPermit.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close
}

private final AsyncIOProcessor<Translog.Location> translogSyncProcessor = new AsyncIOProcessor<Translog.Location>(logger, 1024) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class IndexShardOperationsLock implements Closeable {
public class IndexShardOperationsPermit implements Closeable {
private final ShardId shardId;
private final Logger logger;
private final ThreadPool threadPool;
Expand All @@ -47,7 +47,7 @@ public class IndexShardOperationsLock implements Closeable {
@Nullable private List<ActionListener<Releasable>> delayedOperations; // operations that are delayed due to relocation hand-off
private volatile boolean closed;

public IndexShardOperationsLock(ShardId shardId, Logger logger, ThreadPool threadPool) {
public IndexShardOperationsPermit(ShardId shardId, Logger logger, ThreadPool threadPool) {
this.shardId = shardId;
this.logger = logger;
this.threadPool = threadPool;
Expand All @@ -67,14 +67,15 @@ public void close() {
* @param onBlocked the action to run once the block has been acquired
* @throws InterruptedException if calling thread is interrupted
* @throws TimeoutException if timed out waiting for in-flight operations to finish
* @throws IndexShardClosedException if operation lock has been closed
* @throws IndexShardClosedException if operation permit has been closed
*/
public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException {
if (closed) {
throw new IndexShardClosedException(shardId);
}
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
assert semaphore.availablePermits() == 0;
try {
onBlocked.run();
} finally {
Expand Down Expand Up @@ -106,14 +107,14 @@ public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked)
}

/**
* Acquires a lock whenever lock acquisition is not blocked. If the lock is directly available, the provided
* ActionListener will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)}, lock
* acquisition can be delayed. The provided ActionListener will then be called using the provided executor once blockOperations
* terminates.
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
* {@link ActionListener} will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)},
* permit acquisition can be delayed. The provided ActionListener will then be called using the provided executor once operations are no
* longer blocked.
*
* @param onAcquired ActionListener that is invoked once acquisition is successful or failed
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param executorOnDelay executor to use for delayed call
* @param forceExecution whether the runnable should force its execution in case it gets rejected
* @param forceExecution whether the runnable should force its execution in case it gets rejected
*/
public void acquire(ActionListener<Releasable> onAcquired, String executorOnDelay, boolean forceExecution) {
if (closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
final Set<String> initializingIds =
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()));
shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
}
} catch (Exception e) {
Expand Down Expand Up @@ -737,6 +738,13 @@ public interface Shard {
*/
void updateRoutingEntry(ShardRouting shardRouting) throws IOException;

/**
* Update the primary term. This method should only be invoked on primary shards.
*
* @param primaryTerm the new primary term
*/
void updatePrimaryTerm(long primaryTerm);

/**
* Notifies the service of the current allocation ids in the cluster state.
* See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString());
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString());
doAnswer(invocation -> {
long term = (Long)invocation.getArguments()[0];
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
Expand All @@ -1103,7 +1103,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
return null;
}).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString());
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
Expand Down
Loading

0 comments on commit bf5ab75

Please sign in to comment.