Skip to content

Commit

Permalink
Block older operations on primary term transition
Browse files Browse the repository at this point in the history
Today a replica learns of a new primary term via a cluster state update
and there is not a clean transition between the older primary term and
the newer primary term. This commit modifies this situation so that:
 - a replica shard learns of a new primary term via replication
   operations executed under the mandate of the new primary
 - when a replica shard learns of a new primary term, it blocks
   operations on older terms from reaching the engine, with a clear
   transition point between the operations on the older term and the
   operations on the newer term

This work paves the way for a primary/replica sync on primary
promotion. Future work will also ensure a clean transition point on a
promoted primary, and prepare a replica shard for a sync with the
promoted primary.
  • Loading branch information
jasontedor committed May 19, 2017
1 parent 0aa380b commit 857fcdb
Show file tree
Hide file tree
Showing 13 changed files with 351 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
Expand All @@ -52,7 +51,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -180,7 +178,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima

/**
* Synchronous replica operation on nodes with replica copies. This is done under the lock form
* {@link IndexShard#acquireReplicaOperationLock(long, ActionListener, String)}
* {@link IndexShard#acquireReplicaOperationPermit(long, ActionListener, String)}
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
Expand Down Expand Up @@ -584,7 +582,7 @@ protected void doRun() throws Exception {
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
replica.acquireReplicaOperationLock(request.primaryTerm, this, executor);
replica.acquireReplicaOperationPermit(request.primaryTerm, this, executor);
}

/**
Expand Down Expand Up @@ -921,7 +919,7 @@ public void onFailure(Exception e) {
}
};

indexShard.acquirePrimaryOperationLock(onAcquired, executor);
indexShard.acquirePrimaryOperationPermit(onAcquired, executor);
}

class ShardReference implements Releasable {
Expand Down
5 changes: 0 additions & 5 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,6 @@ public synchronized void updateMetaData(final IndexMetaData metadata) {
rescheduleFsyncTask(durability);
}
}

// update primary terms
for (final IndexShard shard : this.shards.values()) {
shard.updatePrimaryTerm(metadata.primaryTerm(shard.shardId().id()));
}
}

private void rescheduleFsyncTask(Translog.Durability durability) {
Expand Down
109 changes: 84 additions & 25 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -131,6 +130,7 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -197,7 +197,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl

private final ShardPath path;

private final IndexShardOperationsLock indexShardOperationsLock;
private final IndexShardOperationPermits indexShardOperationPermits;

private static final EnumSet<IndexShardState> readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY);
// for primaries, we only allow to write when actually started (so the cluster has decided we started)
Expand Down Expand Up @@ -274,7 +274,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP
}
this.cachingPolicy = cachingPolicy;
}
indexShardOperationsLock = new IndexShardOperationsLock(shardId, logger, threadPool);
indexShardOperationPermits = new IndexShardOperationPermits(shardId, logger, threadPool);
searcherWrapper = indexSearcherWrapper;
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
refreshListeners = buildRefreshListeners();
Expand Down Expand Up @@ -330,7 +330,6 @@ public ShardFieldData fieldData() {
return this.shardFieldData;
}


/**
* Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
*/
Expand All @@ -342,6 +341,7 @@ public long getPrimaryTerm() {
* notifies the shard of an increase in the primary term
*/
public void updatePrimaryTerm(final long newTerm) {
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
synchronized (mutex) {
if (newTerm != primaryTerm) {
// Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
Expand All @@ -356,10 +356,13 @@ public void updatePrimaryTerm(final long newTerm) {
//
// We could fail the shard in that case, but this will cause it to be removed from the insync allocations list
// potentially preventing re-allocation.
assert shardRouting.primary() == false || shardRouting.initializing() == false :
"a started primary shard should never update it's term. shard: " + shardRouting
+ " current term [" + primaryTerm + "] new term [" + newTerm + "]";
assert newTerm > primaryTerm : "primary terms can only go up. current [" + primaryTerm + "], new [" + newTerm + "]";
assert shardRouting.initializing() == false :
"a started primary shard should never update its term; "
+ "shard " + shardRouting + ", "
+ "current term [" + primaryTerm + "], "
+ "new term [" + newTerm + "]";
assert newTerm > primaryTerm :
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]";
primaryTerm = newTerm;
}
}
Expand Down Expand Up @@ -459,9 +462,9 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try {
indexShardOperationsLock.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation locks are being held here, move state from started to relocated
assert indexShardOperationsLock.getActiveOperationsCount() == 0 :
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
"in-flight operations in progress while moving shard state to relocated";
synchronized (mutex) {
if (state != IndexShardState.STARTED) {
Expand Down Expand Up @@ -978,7 +981,7 @@ public void close(String reason, boolean flushEngine) throws IOException {
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
// Also closing refreshListeners to prevent us from accumulating any more listeners
IOUtils.close(engine, refreshListeners);
indexShardOperationsLock.close();
indexShardOperationPermits.close();
}
}
}
Expand Down Expand Up @@ -1845,35 +1848,91 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
}

/**
* Acquire a primary operation lock whenever the shard is ready for indexing. If the lock is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, lock acquisition can be delayed. The provided
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
* ActionListener will then be called using the provided executor.
*/
public void acquirePrimaryOperationLock(ActionListener<Releasable> onLockAcquired, String executorOnDelay) {
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay) {
verifyNotClosed();
verifyPrimary();

indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, false);
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false);
}

private final AtomicLong pendingPrimaryTerm = new AtomicLong();

/**
* Acquire a replica operation lock whenever the shard is ready for indexing (see acquirePrimaryOperationLock). If the given primary
* term is lower then the one in {@link #shardRouting} an {@link IllegalArgumentException} is thrown.
* Acquire a replica operation permit whenever the shard is ready for indexing (see
* {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
* name.
*
* @param operationPrimaryTerm the operation primary term
* @param onPermitAcquired the listener for permit acquisition
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
*/
public void acquireReplicaOperationLock(long opPrimaryTerm, ActionListener<Releasable> onLockAcquired, String executorOnDelay) {
public void acquireReplicaOperationPermit(
final long operationPrimaryTerm, final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
verifyNotClosed();
verifyReplicationTarget();
if (primaryTerm > opPrimaryTerm) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])",
shardId, opPrimaryTerm, primaryTerm));
if (operationPrimaryTerm > primaryTerm
&& pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) {
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
if (operationPrimaryTerm > primaryTerm) {
primaryTerm = operationPrimaryTerm;
}
});
} catch (final InterruptedException | TimeoutException e) {
onPermitAcquired.onFailure(e);
}
}

indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, true);
final long currentPrimaryTerm = primaryTerm;
if (operationPrimaryTerm == currentPrimaryTerm) {
indexShardOperationPermits.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
assert operationPrimaryTerm <= primaryTerm
: "operation primary term [" + operationPrimaryTerm + "] should be at most [" + primaryTerm + "]";
if (operationPrimaryTerm < primaryTerm) {
releasable.close();
onOperationPrimaryTermIsTooOld(shardId, operationPrimaryTerm, primaryTerm, onPermitAcquired);
} else {
onPermitAcquired.onResponse(releasable);
}
}

@Override
public void onFailure(final Exception e) {
onPermitAcquired.onFailure(e);
}
},
executorOnDelay,
true);
} else {
onOperationPrimaryTermIsTooOld(shardId, operationPrimaryTerm, currentPrimaryTerm, onPermitAcquired);
}
}

private static void onOperationPrimaryTermIsTooOld(
final ShardId shardId,
final long operationPrimaryTerm,
final long primaryTerm,
final ActionListener<Releasable> onPermitAcquired) {
final String message = String.format(
Locale.ROOT,
"%s operation primary term [%d] is too old (current [%d])",
shardId,
operationPrimaryTerm,
primaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
}

public int getActiveOperationsCount() {
return indexShardOperationsLock.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close
return indexShardOperationPermits.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close
}

private final AsyncIOProcessor<Translog.Location> translogSyncProcessor = new AsyncIOProcessor<Translog.Location>(logger, 1024) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.shard;

import org.apache.logging.log4j.Logger;
Expand All @@ -36,18 +37,18 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class IndexShardOperationsLock implements Closeable {
final class IndexShardOperationPermits implements Closeable {
private final ShardId shardId;
private final Logger logger;
private final ThreadPool threadPool;

private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
// fair semaphore to ensure that blockOperations() does not starve under thread contention
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
@Nullable private List<ActionListener<Releasable>> delayedOperations; // operations that are delayed due to relocation hand-off
@Nullable private List<ActionListener<Releasable>> delayedOperations; // operations that are delayed
private volatile boolean closed;

public IndexShardOperationsLock(ShardId shardId, Logger logger, ThreadPool threadPool) {
IndexShardOperationPermits(ShardId shardId, Logger logger, ThreadPool threadPool) {
this.shardId = shardId;
this.logger = logger;
this.threadPool = threadPool;
Expand All @@ -67,14 +68,15 @@ public void close() {
* @param onBlocked the action to run once the block has been acquired
* @throws InterruptedException if calling thread is interrupted
* @throws TimeoutException if timed out waiting for in-flight operations to finish
* @throws IndexShardClosedException if operation lock has been closed
* @throws IndexShardClosedException if operation permit has been closed
*/
public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException {
if (closed) {
throw new IndexShardClosedException(shardId);
}
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
assert semaphore.availablePermits() == 0;
try {
onBlocked.run();
} finally {
Expand All @@ -91,7 +93,7 @@ public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked)
}
if (queuedActions != null) {
// Try acquiring permits on fresh thread (for two reasons):
// - blockOperations is called on recovery thread which can be expected to be interrupted when recovery is cancelled.
// - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled.
// Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by
// ThreadedActionListener if the queue of the thread pool on which it submits is full.
// - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure
Expand All @@ -106,14 +108,14 @@ public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked)
}

/**
* Acquires a lock whenever lock acquisition is not blocked. If the lock is directly available, the provided
* ActionListener will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)}, lock
* acquisition can be delayed. The provided ActionListener will then be called using the provided executor once blockOperations
* terminates.
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
* {@link ActionListener} will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)},
* permit acquisition can be delayed. The provided ActionListener will then be called using the provided executor once operations are no
* longer blocked.
*
* @param onAcquired ActionListener that is invoked once acquisition is successful or failed
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param executorOnDelay executor to use for delayed call
* @param forceExecution whether the runnable should force its execution in case it gets rejected
* @param forceExecution whether the runnable should force its execution in case it gets rejected
*/
public void acquire(ActionListener<Releasable> onAcquired, String executorOnDelay, boolean forceExecution) {
if (closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
final Set<String> initializingIds =
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()));
shard.updateAllocationIdsFromMaster(activeIds, initializingIds);
}
} catch (Exception e) {
Expand Down Expand Up @@ -737,6 +738,13 @@ public interface Shard {
*/
void updateRoutingEntry(ShardRouting shardRouting) throws IOException;

/**
* Update the primary term. This method should only be invoked on primary shards.
*
* @param primaryTerm the new primary term
*/
void updatePrimaryTerm(long primaryTerm);

/**
* Notifies the service of the current allocation ids in the cluster state.
* See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details.
Expand Down
Loading

0 comments on commit 857fcdb

Please sign in to comment.