From 857fcdb23e5ec053a6164d9d0bc187e32435d151 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 17 May 2017 15:46:48 -0400 Subject: [PATCH] Block older operations on primary term transition Today a replica learns of a new primary term via a cluster state update and there is not a clean transition between the older primary term and the newer primary term. This commit modifies this situation so that: - a replica shard learns of a new primary term via replication operations executed under the mandate of the new primary - when a replica shard learns of a new primary term, it blocks operations on older terms from reaching the engine, with a clear transition point between the operations on the older term and the operations on the newer term This work paves the way for a primary/replica sync on primary promotion. Future work will also ensure a clean transition point on a promoted primary, and prepare a replica shard for a sync with the promoted primary. --- .../TransportReplicationAction.java | 8 +- .../org/elasticsearch/index/IndexService.java | 5 - .../elasticsearch/index/shard/IndexShard.java | 109 +++++++--- ...k.java => IndexShardOperationPermits.java} | 24 ++- .../cluster/IndicesClusterStateService.java | 8 + .../TransportReplicationActionTests.java | 4 +- .../TransportWriteActionTests.java | 4 +- .../routing/allocation/ShardStateIT.java | 16 +- .../ESIndexLevelReplicationTestCase.java | 49 +++-- ...a => IndexShardOperationPermitsTests.java} | 53 +++-- .../index/shard/IndexShardTests.java | 200 +++++++++++++++--- ...actIndicesClusterStateServiceTestCase.java | 5 + .../flush/SyncedFlushSingleNodeTests.java | 2 +- 13 files changed, 351 insertions(+), 136 deletions(-) rename core/src/main/java/org/elasticsearch/index/shard/{IndexShardOperationsLock.java => IndexShardOperationPermits.java} (86%) rename core/src/test/java/org/elasticsearch/index/shard/{IndexShardOperationsLockTests.java => IndexShardOperationPermitsTests.java} (84%) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index a8ee1677bbdeb..d1b6657684fa3 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -38,7 +38,6 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -52,7 +51,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; @@ -180,7 +178,7 @@ protected abstract PrimaryResult shardOperationOnPrima /** * Synchronous replica operation on nodes with replica copies. This is done under the lock form - * {@link IndexShard#acquireReplicaOperationLock(long, ActionListener, String)} + * {@link IndexShard#acquireReplicaOperationPermit(long, ActionListener, String)} * * @param shardRequest the request to the replica shard * @param replica the replica shard to perform the operation on @@ -584,7 +582,7 @@ protected void doRun() throws Exception { throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID, actualAllocationId); } - replica.acquireReplicaOperationLock(request.primaryTerm, this, executor); + replica.acquireReplicaOperationPermit(request.primaryTerm, this, executor); } /** @@ -921,7 +919,7 @@ public void onFailure(Exception e) { } }; - indexShard.acquirePrimaryOperationLock(onAcquired, executor); + indexShard.acquirePrimaryOperationPermit(onAcquired, executor); } class ShardReference implements Releasable { diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index cf41ad8ec1d17..a4ab30116f73f 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -612,11 +612,6 @@ public synchronized void updateMetaData(final IndexMetaData metadata) { rescheduleFsyncTask(durability); } } - - // update primary terms - for (final IndexShard shard : this.shards.values()) { - shard.updatePrimaryTerm(metadata.primaryTerm(shard.shardId().id())); - } } private void rescheduleFsyncTask(Translog.Durability durability) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8da19d7caccd2..1b042666d20a7 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -52,7 +52,6 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; @@ -131,6 +130,7 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -197,7 +197,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final ShardPath path; - private final IndexShardOperationsLock indexShardOperationsLock; + private final IndexShardOperationPermits indexShardOperationPermits; private static final EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); // for primaries, we only allow to write when actually started (so the cluster has decided we started) @@ -274,7 +274,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP } this.cachingPolicy = cachingPolicy; } - indexShardOperationsLock = new IndexShardOperationsLock(shardId, logger, threadPool); + indexShardOperationPermits = new IndexShardOperationPermits(shardId, logger, threadPool); searcherWrapper = indexSearcherWrapper; primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); refreshListeners = buildRefreshListeners(); @@ -330,7 +330,6 @@ public ShardFieldData fieldData() { return this.shardFieldData; } - /** * Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)} */ @@ -342,6 +341,7 @@ public long getPrimaryTerm() { * notifies the shard of an increase in the primary term */ public void updatePrimaryTerm(final long newTerm) { + assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard"; synchronized (mutex) { if (newTerm != primaryTerm) { // Note that due to cluster state batching an initializing primary shard term can failed and re-assigned @@ -356,10 +356,13 @@ public void updatePrimaryTerm(final long newTerm) { // // We could fail the shard in that case, but this will cause it to be removed from the insync allocations list // potentially preventing re-allocation. - assert shardRouting.primary() == false || shardRouting.initializing() == false : - "a started primary shard should never update it's term. shard: " + shardRouting - + " current term [" + primaryTerm + "] new term [" + newTerm + "]"; - assert newTerm > primaryTerm : "primary terms can only go up. current [" + primaryTerm + "], new [" + newTerm + "]"; + assert shardRouting.initializing() == false : + "a started primary shard should never update its term; " + + "shard " + shardRouting + ", " + + "current term [" + primaryTerm + "], " + + "new term [" + newTerm + "]"; + assert newTerm > primaryTerm : + "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]"; primaryTerm = newTerm; } } @@ -459,9 +462,9 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta public void relocated(String reason) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try { - indexShardOperationsLock.blockOperations(30, TimeUnit.MINUTES, () -> { - // no shard operation locks are being held here, move state from started to relocated - assert indexShardOperationsLock.getActiveOperationsCount() == 0 : + indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { + // no shard operation permits are being held here, move state from started to relocated + assert indexShardOperationPermits.getActiveOperationsCount() == 0 : "in-flight operations in progress while moving shard state to relocated"; synchronized (mutex) { if (state != IndexShardState.STARTED) { @@ -978,7 +981,7 @@ public void close(String reason, boolean flushEngine) throws IOException { // playing safe here and close the engine even if the above succeeds - close can be called multiple times // Also closing refreshListeners to prevent us from accumulating any more listeners IOUtils.close(engine, refreshListeners); - indexShardOperationsLock.close(); + indexShardOperationPermits.close(); } } } @@ -1845,35 +1848,91 @@ private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { } /** - * Acquire a primary operation lock whenever the shard is ready for indexing. If the lock is directly available, the provided - * ActionListener will be called on the calling thread. During relocation hand-off, lock acquisition can be delayed. The provided + * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided + * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided * ActionListener will then be called using the provided executor. */ - public void acquirePrimaryOperationLock(ActionListener onLockAcquired, String executorOnDelay) { + public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay) { verifyNotClosed(); verifyPrimary(); - indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, false); + indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false); } + private final AtomicLong pendingPrimaryTerm = new AtomicLong(); + /** - * Acquire a replica operation lock whenever the shard is ready for indexing (see acquirePrimaryOperationLock). If the given primary - * term is lower then the one in {@link #shardRouting} an {@link IllegalArgumentException} is thrown. + * Acquire a replica operation permit whenever the shard is ready for indexing (see + * {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in + * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an + * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified + * name. + * + * @param operationPrimaryTerm the operation primary term + * @param onPermitAcquired the listener for permit acquisition + * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed */ - public void acquireReplicaOperationLock(long opPrimaryTerm, ActionListener onLockAcquired, String executorOnDelay) { + public void acquireReplicaOperationPermit( + final long operationPrimaryTerm, final ActionListener onPermitAcquired, final String executorOnDelay) { verifyNotClosed(); verifyReplicationTarget(); - if (primaryTerm > opPrimaryTerm) { - // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException - throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", - shardId, opPrimaryTerm, primaryTerm)); + if (operationPrimaryTerm > primaryTerm + && pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) { + try { + indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { + if (operationPrimaryTerm > primaryTerm) { + primaryTerm = operationPrimaryTerm; + } + }); + } catch (final InterruptedException | TimeoutException e) { + onPermitAcquired.onFailure(e); + } } - indexShardOperationsLock.acquire(onLockAcquired, executorOnDelay, true); + final long currentPrimaryTerm = primaryTerm; + if (operationPrimaryTerm == currentPrimaryTerm) { + indexShardOperationPermits.acquire( + new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + assert operationPrimaryTerm <= primaryTerm + : "operation primary term [" + operationPrimaryTerm + "] should be at most [" + primaryTerm + "]"; + if (operationPrimaryTerm < primaryTerm) { + releasable.close(); + onOperationPrimaryTermIsTooOld(shardId, operationPrimaryTerm, primaryTerm, onPermitAcquired); + } else { + onPermitAcquired.onResponse(releasable); + } + } + + @Override + public void onFailure(final Exception e) { + onPermitAcquired.onFailure(e); + } + }, + executorOnDelay, + true); + } else { + onOperationPrimaryTermIsTooOld(shardId, operationPrimaryTerm, currentPrimaryTerm, onPermitAcquired); + } + } + + private static void onOperationPrimaryTermIsTooOld( + final ShardId shardId, + final long operationPrimaryTerm, + final long primaryTerm, + final ActionListener onPermitAcquired) { + final String message = String.format( + Locale.ROOT, + "%s operation primary term [%d] is too old (current [%d])", + shardId, + operationPrimaryTerm, + primaryTerm); + onPermitAcquired.onFailure(new IllegalStateException(message)); } public int getActiveOperationsCount() { - return indexShardOperationsLock.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close + return indexShardOperationPermits.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close } private final AsyncIOProcessor translogSyncProcessor = new AsyncIOProcessor(logger, 1024) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationsLock.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java similarity index 86% rename from core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationsLock.java rename to core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 70e2037664f7f..016067259c11b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationsLock.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; @@ -36,7 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; -public class IndexShardOperationsLock implements Closeable { +final class IndexShardOperationPermits implements Closeable { private final ShardId shardId; private final Logger logger; private final ThreadPool threadPool; @@ -44,10 +45,10 @@ public class IndexShardOperationsLock implements Closeable { private static final int TOTAL_PERMITS = Integer.MAX_VALUE; // fair semaphore to ensure that blockOperations() does not starve under thread contention final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); - @Nullable private List> delayedOperations; // operations that are delayed due to relocation hand-off + @Nullable private List> delayedOperations; // operations that are delayed private volatile boolean closed; - public IndexShardOperationsLock(ShardId shardId, Logger logger, ThreadPool threadPool) { + IndexShardOperationPermits(ShardId shardId, Logger logger, ThreadPool threadPool) { this.shardId = shardId; this.logger = logger; this.threadPool = threadPool; @@ -67,7 +68,7 @@ public void close() { * @param onBlocked the action to run once the block has been acquired * @throws InterruptedException if calling thread is interrupted * @throws TimeoutException if timed out waiting for in-flight operations to finish - * @throws IndexShardClosedException if operation lock has been closed + * @throws IndexShardClosedException if operation permit has been closed */ public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException { if (closed) { @@ -75,6 +76,7 @@ public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) } try { if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { + assert semaphore.availablePermits() == 0; try { onBlocked.run(); } finally { @@ -91,7 +93,7 @@ public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) } if (queuedActions != null) { // Try acquiring permits on fresh thread (for two reasons): - // - blockOperations is called on recovery thread which can be expected to be interrupted when recovery is cancelled. + // - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled. // Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by // ThreadedActionListener if the queue of the thread pool on which it submits is full. // - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure @@ -106,14 +108,14 @@ public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) } /** - * Acquires a lock whenever lock acquisition is not blocked. If the lock is directly available, the provided - * ActionListener will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)}, lock - * acquisition can be delayed. The provided ActionListener will then be called using the provided executor once blockOperations - * terminates. + * Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided + * {@link ActionListener} will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)}, + * permit acquisition can be delayed. The provided ActionListener will then be called using the provided executor once operations are no + * longer blocked. * - * @param onAcquired ActionListener that is invoked once acquisition is successful or failed + * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed * @param executorOnDelay executor to use for delayed call - * @param forceExecution whether the runnable should force its execution in case it gets rejected + * @param forceExecution whether the runnable should force its execution in case it gets rejected */ public void acquire(ActionListener onAcquired, String executorOnDelay, boolean forceExecution) { if (closed) { diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 7371bde893994..147a952507ad9 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -561,6 +561,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); final Set initializingIds = allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); + shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id())); shard.updateAllocationIdsFromMaster(activeIds, initializingIds); } } catch (Exception e) { @@ -737,6 +738,13 @@ public interface Shard { */ void updateRoutingEntry(ShardRouting shardRouting) throws IOException; + /** + * Update the primary term. This method should only be invoked on primary shards. + * + * @param primaryTerm the new primary term + */ + void updatePrimaryTerm(long primaryTerm); + /** * Notifies the service of the current allocation ids in the cluster state. * See {@link GlobalCheckpointTracker#updateAllocationIdsFromMaster(Set, Set)} for details. diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 0f8071cce367c..89026d9d1dbd3 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -1091,7 +1091,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[1]; @@ -1103,7 +1103,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index ca16e816b47fe..f0690ad67b5f8 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -444,7 +444,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationLock(any(ActionListener.class), anyString()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[1]; @@ -456,7 +456,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationLock(anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), any(ActionListener.class), anyString()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java index 7e05448bf91ab..8338abd520fc9 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java @@ -18,9 +18,11 @@ */ package org.elasticsearch.cluster.routing.allocation; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -48,10 +50,10 @@ public void testPrimaryFailureIncreasesTerm() throws Exception { indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null); logger.info("--> waiting for a yellow index"); - // JDK 9 type inference gets confused, so we have to help the - // type inference - assertBusy(((Runnable) () -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), - equalTo(ClusterHealthStatus.YELLOW)))); + ensureYellow(); + + // this forces the primary term to propagate to the replicas + client().index(new IndexRequest("test", "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON)).get(); final long term0 = shard == 0 ? 2 : 1; final long term1 = shard == 1 ? 2 : 1; @@ -63,13 +65,13 @@ public void testPrimaryFailureIncreasesTerm() throws Exception { assertPrimaryTerms(term0, term1); } - protected void assertPrimaryTerms(long term0, long term1) { + protected void assertPrimaryTerms(long shard0Term, long shard1Term) { for (String node : internalCluster().getNodeNames()) { logger.debug("--> asserting primary terms terms on [{}]", node); ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState(); IndexMetaData metaData = state.metaData().index("test"); - assertThat(metaData.primaryTerm(0), equalTo(term0)); - assertThat(metaData.primaryTerm(1), equalTo(term1)); + assertThat(metaData.primaryTerm(0), equalTo(shard0Term)); + assertThat(metaData.primaryTerm(1), equalTo(shard1Term)); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); IndexService indexService = indicesService.indexService(metaData.getIndex()); if (indexService != null) { diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 0f68dc3c50a76..8e27ab5e9d392 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -47,6 +47,8 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; @@ -59,6 +61,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequest; import java.io.IOException; @@ -225,7 +228,6 @@ public synchronized void addReplica(IndexShard replica) { assert shardRoutings().stream() .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; - replica.updatePrimaryTerm(primary.getPrimaryTerm()); replicas.add(replica); updateAllocationIDsOnPrimary(); } @@ -254,17 +256,13 @@ public synchronized List getReplicas() { */ public synchronized void promoteReplicaToPrimary(IndexShard replica) throws IOException { final long newTerm = indexMetaData.primaryTerm(shardId.id()) + 1; - IndexMetaData.Builder newMetaData = - IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm); + IndexMetaData.Builder newMetaData = IndexMetaData.builder(indexMetaData).primaryTerm(shardId.id(), newTerm); indexMetaData = newMetaData.build(); - for (IndexShard shard: replicas) { - shard.updatePrimaryTerm(newTerm); - } - boolean found = replicas.remove(replica); - assert found; + assertTrue(replicas.remove(replica)); closeShards(primary); primary = replica; - replica.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary()); + primary.updateRoutingEntry(replica.routingEntry().moveActiveReplicaToPrimary()); + primary.updatePrimaryTerm(newTerm); updateAllocationIDsOnPrimary(); } @@ -476,15 +474,32 @@ public void performOn( final ReplicaRequest request, final long globalCheckpoint, final ActionListener listener) { - try { - IndexShard replica = replicationGroup.replicas.stream() + IndexShard replica = replicationGroup.replicas.stream() .filter(s -> replicaRouting.isSameAllocation(s.routingEntry())).findFirst().get(); - replica.updateGlobalCheckpointOnReplica(globalCheckpoint); - performOnReplica(request, replica); - listener.onResponse(new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint())); - } catch (Exception e) { - listener.onFailure(e); - } + replica.acquireReplicaOperationPermit( + request.primaryTerm(), + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + try { + replica.updateGlobalCheckpointOnReplica(globalCheckpoint); + performOnReplica(request, replica); + releasable.close(); + listener.onResponse( + new ReplicaResponse( + replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint())); + } catch (final Exception e) { + Releasables.closeWhileHandlingException(releasable); + listener.onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }, + ThreadPool.Names.INDEX); } @Override diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationsLockTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java similarity index 84% rename from core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationsLockTests.java rename to core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index d3df93513d008..18a250a42827d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationsLockTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -37,16 +37,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; -import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -public class IndexShardOperationsLockTests extends ESTestCase { +public class IndexShardOperationPermitsTests extends ESTestCase { private static ThreadPool threadPool; - private IndexShardOperationsLock block; + private IndexShardOperationPermits permits; @BeforeClass public static void setupThreadPool() { @@ -61,13 +60,13 @@ public static void shutdownThreadPool() { @Before public void createIndexShardOperationsLock() { - block = new IndexShardOperationsLock(new ShardId("blubb", "id", 0), logger, threadPool); + permits = new IndexShardOperationPermits(new ShardId("blubb", "id", 0), logger, threadPool); } @After public void checkNoInflightOperations() { - assertThat(block.semaphore.availablePermits(), equalTo(Integer.MAX_VALUE)); - assertThat(block.getActiveOperationsCount(), equalTo(0)); + assertThat(permits.semaphore.availablePermits(), equalTo(Integer.MAX_VALUE)); + assertThat(permits.getActiveOperationsCount(), equalTo(0)); } public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException { @@ -87,7 +86,7 @@ public void onResponse(Releasable releasable) { Thread thread = new Thread() { public void run() { latch.countDown(); - block.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true); } }; futures.add(future); @@ -123,29 +122,29 @@ public void run() { public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionException, InterruptedException { PlainActionFuture future = new PlainActionFuture<>(); - block.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true); assertTrue(future.isDone()); future.get().close(); } public void testOperationsIfClosed() throws ExecutionException, InterruptedException { PlainActionFuture future = new PlainActionFuture<>(); - block.close(); - block.acquire(future, ThreadPool.Names.GENERIC, true); + permits.close(); + permits.acquire(future, ThreadPool.Names.GENERIC, true); ExecutionException exception = expectThrows(ExecutionException.class, future::get); assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class)); } public void testBlockIfClosed() throws ExecutionException, InterruptedException { - block.close(); - expectThrows(IndexShardClosedException.class, () -> block.blockOperations(randomInt(10), TimeUnit.MINUTES, + permits.close(); + expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES, () -> { throw new IllegalArgumentException("fake error"); })); } public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { PlainActionFuture future = new PlainActionFuture<>(); try (Releasable releasable = blockAndWait()) { - block.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true); assertFalse(future.isDone()); } future.get(1, TimeUnit.HOURS).close(); @@ -192,8 +191,8 @@ public void onResponse(Releasable releasable) { context.putHeader("foo", "bar"); context.putTransient("bar", "baz"); // test both with and without a executor name - block.acquire(future, ThreadPool.Names.GENERIC, true); - block.acquire(future2, null, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future2, null, true); } assertFalse(future.isDone()); } @@ -209,7 +208,7 @@ protected Releasable blockAndWait() throws InterruptedException { IndexShardClosedException exception = new IndexShardClosedException(new ShardId("blubb", "id", 0)); threadPool.generic().execute(() -> { try { - block.blockOperations(1, TimeUnit.MINUTES, () -> { + permits.blockOperations(1, TimeUnit.MINUTES, () -> { try { blockAcquired.countDown(); releaseBlock.await(); @@ -241,31 +240,31 @@ protected Releasable blockAndWait() throws InterruptedException { public void testActiveOperationsCount() throws ExecutionException, InterruptedException { PlainActionFuture future1 = new PlainActionFuture<>(); - block.acquire(future1, ThreadPool.Names.GENERIC, true); + permits.acquire(future1, ThreadPool.Names.GENERIC, true); assertTrue(future1.isDone()); - assertThat(block.getActiveOperationsCount(), equalTo(1)); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); PlainActionFuture future2 = new PlainActionFuture<>(); - block.acquire(future2, ThreadPool.Names.GENERIC, true); + permits.acquire(future2, ThreadPool.Names.GENERIC, true); assertTrue(future2.isDone()); - assertThat(block.getActiveOperationsCount(), equalTo(2)); + assertThat(permits.getActiveOperationsCount(), equalTo(2)); future1.get().close(); - assertThat(block.getActiveOperationsCount(), equalTo(1)); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); future1.get().close(); // check idempotence - assertThat(block.getActiveOperationsCount(), equalTo(1)); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); future2.get().close(); - assertThat(block.getActiveOperationsCount(), equalTo(0)); + assertThat(permits.getActiveOperationsCount(), equalTo(0)); try (Releasable releasable = blockAndWait()) { - assertThat(block.getActiveOperationsCount(), equalTo(0)); + assertThat(permits.getActiveOperationsCount(), equalTo(0)); } PlainActionFuture future3 = new PlainActionFuture<>(); - block.acquire(future3, ThreadPool.Names.GENERIC, true); + permits.acquire(future3, ThreadPool.Names.GENERIC, true); assertTrue(future3.isDone()); - assertThat(block.getActiveOperationsCount(), equalTo(1)); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); future3.get().close(); - assertThat(block.getActiveOperationsCount(), equalTo(0)); + assertThat(permits.getActiveOperationsCount(), equalTo(0)); } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c925775fa5bd0..8cea0278d0a75 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -33,6 +33,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Constants; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.stats.CommonStats; @@ -118,8 +119,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -130,11 +134,13 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; +import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; @@ -262,20 +268,20 @@ public void testClosesPreventsNewOperations() throws InterruptedException, Execu closeShards(indexShard); assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); try { - indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX); + indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } try { - indexShard.acquireReplicaOperationLock(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), null, ThreadPool.Names.INDEX); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } } - public void testOperationLocksOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { + public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; @@ -287,10 +293,10 @@ public void testOperationLocksOnPrimaryShards() throws InterruptedException, Exe // simulate promotion indexShard = newStartedShard(false); ShardRouting replicaRouting = indexShard.routingEntry(); - indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); ShardRouting primaryRouting = TestShardRouting.newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, ShardRoutingState.STARTED, replicaRouting.allocationId()); indexShard.updateRoutingEntry(primaryRouting); + indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); } else { indexShard = newStartedShard(true); } @@ -298,15 +304,15 @@ public void testOperationLocksOnPrimaryShards() throws InterruptedException, Exe assertEquals(0, indexShard.getActiveOperationsCount()); if (indexShard.routingEntry().isRelocationTarget() == false) { try { - indexShard.acquireReplicaOperationLock(primaryTerm, null, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(primaryTerm, null, ThreadPool.Names.INDEX); fail("shard shouldn't accept operations as replica"); } catch (IllegalStateException ignored) { } } - Releasable operation1 = acquirePrimaryOperationLockBlockingly(indexShard); + Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard); assertEquals(1, indexShard.getActiveOperationsCount()); - Releasable operation2 = acquirePrimaryOperationLockBlockingly(indexShard); + Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard); assertEquals(2, indexShard.getActiveOperationsCount()); Releasables.close(operation1, operation2); @@ -315,20 +321,20 @@ public void testOperationLocksOnPrimaryShards() throws InterruptedException, Exe closeShards(indexShard); } - private Releasable acquirePrimaryOperationLockBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { + private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquirePrimaryOperationLock(fut, ThreadPool.Names.INDEX); + indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX); return fut.get(); } - private Releasable acquireReplicaOperationLockBlockingly(IndexShard indexShard, long opPrimaryTerm) + private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquireReplicaOperationLock(opPrimaryTerm, fut, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(opPrimaryTerm, fut, ThreadPool.Names.INDEX); return fut.get(); } - public void testOperationLocksOnReplicaShards() throws InterruptedException, ExecutionException, IOException { + public void testOperationPermitOnReplicaShards() throws InterruptedException, ExecutionException, IOException, BrokenBarrierException { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; @@ -367,33 +373,159 @@ public void testOperationLocksOnReplicaShards() throws InterruptedException, Exe assertEquals(0, indexShard.getActiveOperationsCount()); if (shardRouting.primary() == false) { - try { - indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX); - fail("shard shouldn't accept primary ops"); - } catch (IllegalStateException ignored) { - - } + final IllegalStateException e = + expectThrows(IllegalStateException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX)); + assertThat(e, hasToString(containsString("shard is not a primary"))); } final long primaryTerm = indexShard.getPrimaryTerm(); - Releasable operation1 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm); + final Releasable operation1 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm); assertEquals(1, indexShard.getActiveOperationsCount()); - Releasable operation2 = acquireReplicaOperationLockBlockingly(indexShard, primaryTerm); + final Releasable operation2 = acquireReplicaOperationPermitBlockingly(indexShard, primaryTerm); assertEquals(2, indexShard.getActiveOperationsCount()); - try { - indexShard.acquireReplicaOperationLock(primaryTerm - 1, null, ThreadPool.Names.INDEX); - fail("you can not increment the operation counter with an older primary term"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("operation term")); - assertThat(e.getMessage(), containsString("too old")); + { + final AtomicBoolean onResponse = new AtomicBoolean(); + final AtomicBoolean onFailure = new AtomicBoolean(); + final AtomicReference onFailureException = new AtomicReference<>(); + ActionListener onLockAcquired = new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + onResponse.set(true); + } + + @Override + public void onFailure(Exception e) { + onFailure.set(true); + onFailureException.set(e); + } + }; + + indexShard.acquireReplicaOperationPermit(primaryTerm - 1, onLockAcquired, ThreadPool.Names.INDEX); + + assertFalse(onResponse.get()); + assertTrue(onFailure.get()); + assertThat(onFailureException.get(), instanceOf(IllegalStateException.class)); + assertThat( + onFailureException.get(), hasToString(containsString("operation primary term [" + (primaryTerm - 1) + "] is too old"))); } - // but you can increment with a newer one.. - acquireReplicaOperationLockBlockingly(indexShard, primaryTerm + 1 + randomInt(20)).close(); - Releasables.close(operation1, operation2); - assertEquals(0, indexShard.getActiveOperationsCount()); + { + final AtomicBoolean onResponse = new AtomicBoolean(); + final AtomicBoolean onFailure = new AtomicBoolean(); + final CyclicBarrier barrier = new CyclicBarrier(2); + // but you can not increment with a new primary term until the operations on the older primary term complete + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + indexShard.acquireReplicaOperationPermit( + primaryTerm + 1 + randomInt(20), + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + onResponse.set(true); + releasable.close(); + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onFailure(Exception e) { + onFailure.set(true); + } + }, + ThreadPool.Names.SAME); + }); + thread.start(); + barrier.await(); + // our operation should be blocked until the previous operations complete + assertFalse(onResponse.get()); + assertFalse(onFailure.get()); + Releasables.close(operation1); + // our operation should still be blocked + assertFalse(onResponse.get()); + assertFalse(onFailure.get()); + Releasables.close(operation2); + barrier.await(); + // now lock acquisition should have succeeded + assertTrue(onResponse.get()); + assertFalse(onFailure.get()); + thread.join(); + assertEquals(0, indexShard.getActiveOperationsCount()); + } + + closeShards(indexShard); + } + + public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException { + final IndexShard indexShard = newStartedShard(false); + + final CyclicBarrier barrier = new CyclicBarrier(3); + final CountDownLatch latch = new CountDownLatch(2); + + final long primaryTerm = indexShard.getPrimaryTerm(); + final AtomicLong counter = new AtomicLong(); + final AtomicReference onFailure = new AtomicReference<>(); + + final Function function = b -> () -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + indexShard.acquireReplicaOperationPermit( + primaryTerm + 1 + (b ? 1 : 0), + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + counter.incrementAndGet(); + latch.countDown(); + releasable.close(); + } + + @Override + public void onFailure(Exception e) { + onFailure.set(e); + latch.countDown(); + } + }, + ThreadPool.Names.INDEX); + }; + + final Thread first = new Thread(function.apply(randomBoolean())); + final Thread second = new Thread(function.apply(randomBoolean())); + + first.start(); + second.start(); + + // the two threads synchronize attempting to acquire an operation permit + barrier.await(); + + // we wait for both operations to complete + latch.await(); + + first.join(); + second.join(); + + final Exception e; + if ((e = onFailure.get()) != null) { + /* + * If one thread tried to set the primary term to a higher value than the other thread and the thread with the higher term won + * the race, then the other thread lost the race and only one operation should have been executed. + */ + assertThat(e, instanceOf(IllegalStateException.class)); + assertThat(e, hasToString(matches("operation primary term \\[\\d+\\] is too old"))); + assertThat(counter.get(), equalTo(1L)); + } else { + assertThat(counter.get(), equalTo(2L)); + } closeShards(indexShard); } @@ -701,7 +833,7 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { } }); - try (Releasable ignored = acquirePrimaryOperationLockBlockingly(shard)) { + try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) { // start finalization of recovery recoveryThread.start(); latch.await(); @@ -711,7 +843,7 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { // recovery can be now finalized recoveryThread.join(); assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); - try (Releasable ignored = acquirePrimaryOperationLockBlockingly(shard)) { + try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) { // lock can again be acquired assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); } @@ -740,7 +872,7 @@ public void onResponse(Releasable releasable) { super.onResponse(releasable); } }; - shard.acquirePrimaryOperationLock(onLockAcquired, ThreadPool.Names.INDEX); + shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX); onLockAcquiredActions.add(onLockAcquired); } @@ -764,7 +896,7 @@ public void testStressRelocated() throws Exception { indexThreads[i] = new Thread() { @Override public void run() { - try (Releasable operationLock = acquirePrimaryOperationLockBlockingly(shard)) { + try (Releasable operationLock = acquirePrimaryOperationPermitBlockingly(shard)) { allPrimaryOperationLocksAcquired.countDown(); barrier.await(); } catch (InterruptedException | BrokenBarrierException | ExecutionException e) { diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index b9e1b23bda190..0a106530f05cc 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -362,6 +362,11 @@ public void updateRoutingEntry(ShardRouting shardRouting) throws IOException { this.shardRouting = shardRouting; } + @Override + public void updatePrimaryTerm(long primaryTerm) { + term = primaryTerm; + } + @Override public void updateAllocationIdsFromMaster(Set activeAllocationIds, Set initializingAllocationIds) { this.activeAllocationIds = activeAllocationIds; diff --git a/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java index 59784456e988b..20c9f3613e5aa 100644 --- a/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java @@ -114,7 +114,7 @@ public void testSyncFailsIfOperationIsInFlight() throws InterruptedException, Ex SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); PlainActionFuture fut = new PlainActionFuture<>(); - shard.acquirePrimaryOperationLock(fut, ThreadPool.Names.INDEX); + shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX); try (Releasable operationLock = fut.get()) { SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.attemptSyncedFlush(shardId, listener);