From 15fc6e119154395488b199161d3dc826d7ad1a8c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 May 2019 12:21:15 -0400 Subject: [PATCH 1/9] Execute actions under permit in primary mode only Today when executing an action on a primary shard under permit, we do not enforce that the shard is in primary mode before executing the action. This commit addresses this by wrapping actions to be executed under permit in a check that the shard is in primary mode before executing the action. --- .../index/seqno/RetentionLeaseActions.java | 17 ++--- .../elasticsearch/index/shard/IndexShard.java | 25 +++++++- .../index/shard/IndexShardTests.java | 63 +++++++++++++++---- 3 files changed, 79 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index c69a4c6fab042..74c98bf3dca19 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -28,8 +28,6 @@ import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -45,7 +43,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; import java.util.Objects; import java.util.function.Supplier; @@ -88,14 +85,10 @@ abstract static class TransportRetentionLeaseAction> extend @Override protected ShardsIterator shards(final ClusterState state, final InternalRequest request) { - final IndexShardRoutingTable shardRoutingTable = state + return state .routingTable() - .shardRoutingTable(request.concreteIndex(), request.request().getShardId().id()); - if (shardRoutingTable.primaryShard().active()) { - return shardRoutingTable.primaryShardIt(); - } else { - return new PlainShardIterator(request.request().getShardId(), Collections.emptyList()); - } + .shardRoutingTable(request.concreteIndex(), request.request().getShardId().id()) + .primaryShardIt(); } @Override @@ -174,6 +167,7 @@ void doRetentionLeaseAction(final IndexShard indexShard, final AddRequest reques protected Writeable.Reader getResponseReader() { return Response::new; } + } @Override @@ -400,9 +394,10 @@ public static class Response extends ActionResponse { public Response() { } - Response(StreamInput in) throws IOException { + Response(final StreamInput in) throws IOException { super(in); } + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 11e4fb81d9fbe..db280a6cc6cf6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.shard; import com.carrotsearch.hppc.ObjectLongMap; - import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CheckIndex; @@ -2496,7 +2495,7 @@ public void acquirePrimaryOperationPermit(ActionListener onPermitAcq verifyNotClosed(); assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting; - indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo); + indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, false, debugInfo); } /** @@ -2507,7 +2506,27 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener verifyNotClosed(); assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting; - asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit()); + asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit()); + } + + /** + * Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before + * executing the action. + * + * @param listener the listener to wrap + * @return the wrapped listener + */ + private ActionListener wrapPrimaryOperationPermitListener(final ActionListener listener) { + return ActionListener.delegateFailure( + listener, + (l, r) -> { + if (replicationTracker.isPrimaryMode() == false) { + r.close(); + l.onFailure(new IllegalStateException("shard is not in primary mode")); + } else { + l.onResponse(r); + } + }); } private void asyncBlockOperations(ActionListener onPermitAcquired, long timeout, TimeUnit timeUnit) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0be7b4433fac3..ff05450054837 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -636,11 +636,13 @@ public void testOperationPermitsOnPrimaryShards() throws Exception { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; + final boolean isPrimaryMode; if (randomBoolean()) { // relocation target indexShard = newShard(newShardRouting(shardId, "local_node", "other node", true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing()))); assertEquals(0, indexShard.getActiveOperationsCount()); + isPrimaryMode = false; } else if (randomBoolean()) { // simulate promotion indexShard = newStartedShard(false); @@ -660,18 +662,56 @@ public void testOperationPermitsOnPrimaryShards() throws Exception { if (randomBoolean()) { assertBusy(() -> assertEquals(0, indexShard.getActiveOperationsCount())); } + isPrimaryMode = true; } else { indexShard = newStartedShard(true); assertEquals(0, indexShard.getActiveOperationsCount()); + isPrimaryMode = true; } - final long primaryTerm = indexShard.getPendingPrimaryTerm(); - Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard); - assertEquals(1, indexShard.getActiveOperationsCount()); - Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard); - assertEquals(2, indexShard.getActiveOperationsCount()); + assert indexShard.getReplicationTracker().isPrimaryMode() == isPrimaryMode; + if (isPrimaryMode) { + Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard); + assertEquals(1, indexShard.getActiveOperationsCount()); + Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard); + assertEquals(2, indexShard.getActiveOperationsCount()); - Releasables.close(operation1, operation2); - assertEquals(0, indexShard.getActiveOperationsCount()); + Releasables.close(operation1, operation2); + assertEquals(0, indexShard.getActiveOperationsCount()); + } else { + indexShard.acquirePrimaryOperationPermit( + new ActionListener<>() { + @Override + public void onResponse(final Releasable releasable) { + throw new AssertionError(); + } + + @Override + public void onFailure(final Exception e) { + assertThat(e, instanceOf(IllegalStateException.class)); + assertThat(e, hasToString(containsString("shard is not in primary mode"))); + } + }, + ThreadPool.Names.SAME, + "test"); + + final CountDownLatch latch = new CountDownLatch(1); + indexShard.acquireAllPrimaryOperationsPermits( + new ActionListener<>() { + @Override + public void onResponse(final Releasable releasable) { + throw new AssertionError(); + } + + @Override + public void onFailure(final Exception e) { + assertThat(e, instanceOf(IllegalStateException.class)); + assertThat(e, hasToString(containsString("shard is not in primary mode"))); + latch.countDown(); + } + }, + TimeValue.timeValueSeconds(30)); + latch.await(); + } if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) { assertThat(expectThrows(AssertionError.class, () -> indexShard.acquireReplicaOperationPermit(primaryTerm, @@ -1688,10 +1728,8 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { // recovery can be now finalized recoveryThread.join(); assertTrue(shard.isRelocatedPrimary()); - try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) { - // lock can again be acquired - assertTrue(shard.isRelocatedPrimary()); - } + final ExecutionException e = expectThrows(ExecutionException.class, () -> acquirePrimaryOperationPermitBlockingly(shard)); + assertThat(e.getCause(), instanceOf(IndexShardRelocatedException.class)); closeShards(shard); } @@ -1722,7 +1760,8 @@ public void onResponse(Releasable releasable) { } for (PlainActionFuture onLockAcquired : onLockAcquiredActions) { - assertNotNull(onLockAcquired.get(30, TimeUnit.SECONDS)); + final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); + assertThat(e.getCause(), instanceOf(IndexShardRelocatedException.class)); } recoveryThread.join(); From ec689bb7efbb8d62cfa0fe91eef1a995caaed32d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 May 2019 14:46:08 -0400 Subject: [PATCH 2/9] Fix tests relying on earlier version of this change --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index ff05450054837..e4abaae230002 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1729,7 +1729,8 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { recoveryThread.join(); assertTrue(shard.isRelocatedPrimary()); final ExecutionException e = expectThrows(ExecutionException.class, () -> acquirePrimaryOperationPermitBlockingly(shard)); - assertThat(e.getCause(), instanceOf(IndexShardRelocatedException.class)); + assertThat(e.getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); closeShards(shard); } @@ -1761,7 +1762,8 @@ public void onResponse(Releasable releasable) { for (PlainActionFuture onLockAcquired : onLockAcquiredActions) { final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); - assertThat(e.getCause(), instanceOf(IndexShardRelocatedException.class)); + assertThat(e.getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); } recoveryThread.join(); From d6bd1b89d6bf7654ce8c482e78d66696cbe3c9f3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 May 2019 14:49:12 -0400 Subject: [PATCH 3/9] Fix test --- .../test/java/org/elasticsearch/index/shard/IndexShardTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e4abaae230002..c78ce8811950d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -669,6 +669,7 @@ public void testOperationPermitsOnPrimaryShards() throws Exception { isPrimaryMode = true; } assert indexShard.getReplicationTracker().isPrimaryMode() == isPrimaryMode; + final long primaryTerm = indexShard.getPendingPrimaryTerm(); if (isPrimaryMode) { Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard); assertEquals(1, indexShard.getActiveOperationsCount()); From bb46faadab9666d05ca13cd763b2c4743330e046 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 May 2019 14:49:45 -0400 Subject: [PATCH 4/9] Fix variable name --- .../java/org/elasticsearch/index/shard/IndexShardTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index c78ce8811950d..67df051d4c611 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -669,7 +669,7 @@ public void testOperationPermitsOnPrimaryShards() throws Exception { isPrimaryMode = true; } assert indexShard.getReplicationTracker().isPrimaryMode() == isPrimaryMode; - final long primaryTerm = indexShard.getPendingPrimaryTerm(); + final long pendingPrimaryTerm = indexShard.getPendingPrimaryTerm(); if (isPrimaryMode) { Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard); assertEquals(1, indexShard.getActiveOperationsCount()); @@ -715,7 +715,7 @@ public void onFailure(final Exception e) { } if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) { - assertThat(expectThrows(AssertionError.class, () -> indexShard.acquireReplicaOperationPermit(primaryTerm, + assertThat(expectThrows(AssertionError.class, () -> indexShard.acquireReplicaOperationPermit(pendingPrimaryTerm, indexShard.getGlobalCheckpoint(), indexShard.getMaxSeqNoOfUpdatesOrDeletes(), new ActionListener() { @Override public void onResponse(Releasable releasable) { From 923e700f27f8c2efcd41ad831caf5bbd9dada5df Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 May 2019 17:59:45 -0400 Subject: [PATCH 5/9] Fix test --- .../index/shard/IndexShardTests.java | 66 ++++++++++++++----- 1 file changed, 49 insertions(+), 17 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 67df051d4c611..396c6badc05ef 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1739,32 +1739,64 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { final IndexShard shard = newStartedShard(true); IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node")); + final CountDownLatch startRecovery = new CountDownLatch(1); + final CountDownLatch relocationStarted = new CountDownLatch(1); Thread recoveryThread = new Thread(() -> { try { - shard.relocated(primaryContext -> {}); + startRecovery.await(); + shard.relocated(primaryContext -> relocationStarted.countDown()); } catch (InterruptedException e) { throw new RuntimeException(e); } }); recoveryThread.start(); - List> onLockAcquiredActions = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - PlainActionFuture onLockAcquired = new PlainActionFuture() { - @Override - public void onResponse(Releasable releasable) { - releasable.close(); - super.onResponse(releasable); - } - }; - shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.WRITE, "i_" + i); - onLockAcquiredActions.add(onLockAcquired); - } - for (PlainActionFuture onLockAcquired : onLockAcquiredActions) { - final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); - assertThat(e.getCause(), instanceOf(IllegalStateException.class)); - assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + final int numberOfAcquisitions = randomIntBetween(1, 10); + final int recoveryIndex = randomIntBetween(1, numberOfAcquisitions); + + for (int i = 0; i < numberOfAcquisitions; i++) { + + final PlainActionFuture onLockAcquired; + final Runnable assertion; + if (i < recoveryIndex) { + final AtomicBoolean invoked = new AtomicBoolean(); + onLockAcquired = new PlainActionFuture<>() { + + @Override + public void onResponse(Releasable releasable) { + invoked.set(true); + releasable.close(); + super.onResponse(releasable); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(); + } + + }; + assertion = () -> assertTrue(invoked.get()); + } else if (recoveryIndex == i) { + startRecovery.countDown(); + relocationStarted.await(); + onLockAcquired = new PlainActionFuture<>(); + assertion = () -> { + final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); + assertThat(e.getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + }; + } else { + onLockAcquired = new PlainActionFuture<>(); + assertion = () -> { + final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); + assertThat(e.getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + }; + } + + shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.WRITE, "i_" + i); + assertion.run(); } recoveryThread.join(); From 4fd9b377cb70f7f4a2642d71d05a9d96916d870e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 May 2019 19:05:05 -0400 Subject: [PATCH 6/9] Retry when not in primary mode --- .../replication/TransportReplicationAction.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 6edaa95033997..1178c7ede2cde 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -307,10 +307,18 @@ protected void doRun() throws Exception { primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm); } - acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap( - releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)), - this::onFailure - )); + acquirePrimaryOperationPermit( + indexShard, + primaryRequest.getRequest(), + ActionListener.wrap( + releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)), + e -> { + if (e instanceof IllegalStateException && e.getMessage().equals("shard is not in primary mode")) { + onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e)); + } else { + onFailure(e); + } + })); } void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) { From ff90a283e6786e647c90aaf93163704b43498599 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 May 2019 22:03:46 -0400 Subject: [PATCH 7/9] Add unit test --- .../TransportReplicationActionTests.java | 48 ++++++++++++++++++- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 12cc9097b652c..396b8429bf9ad 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -390,6 +390,43 @@ public void testNotStartedPrimary() { assertIndexShardCounter(0); } + public void testShardNotInPrimaryMode() { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + final ClusterState state = state(index, true, ShardRoutingState.RELOCATING); + setState(clusterService, state); + final ReplicationTask task = maybeTask(); + final Request request = new Request(shardId); + PlainActionFuture listener = new PlainActionFuture<>(); + final AtomicBoolean executed = new AtomicBoolean(); + + final ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id()); + final TransportReplicationAction.ConcreteShardRequest primaryRequest + = new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm); + + isPrimaryMode.set(false); + + new TestAction(Settings.EMPTY, "internal:test-action", transportService, clusterService, shardStateAction, threadPool) { + @Override + protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary, + ActionListener> listener) { + assertPhase(task, "primary"); + assertFalse(executed.getAndSet(true)); + super.shardOperationOnPrimary(shardRequest, primary, listener); + } + }.new AsyncPrimaryAction(primaryRequest, listener, task).run(); + + assertFalse(executed.get()); + assertIndexShardCounter(0); // no permit should be held + + final ExecutionException e = expectThrows(ExecutionException.class, listener::get); + assertThat(e.getCause(), instanceOf(ReplicationOperation.RetryOnPrimaryException.class)); + assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); + assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause().getCause(), hasToString(containsString("shard is not in primary mode"))); + } + /** * When relocating a primary shard, there is a cluster state update at the end of relocation where the active primary is switched from * the relocation source to the relocation target. If relocation source receives and processes this cluster state @@ -1126,6 +1163,8 @@ private void assertIndexShardCounter(int expected) { private final AtomicBoolean isRelocated = new AtomicBoolean(false); + private final AtomicBoolean isPrimaryMode = new AtomicBoolean(true); + /** * Sometimes build a ReplicationTask for tracking the phase of the * TransportReplicationAction. Since TransportReplicationAction has to work @@ -1273,8 +1312,13 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService when(indexShard.shardId()).thenReturn(shardId); doAnswer(invocation -> { ActionListener callback = (ActionListener) invocation.getArguments()[0]; - count.incrementAndGet(); - callback.onResponse(count::decrementAndGet); + if (isPrimaryMode.get()) { + count.incrementAndGet(); + callback.onResponse(count::decrementAndGet); + + } else { + callback.onFailure(new IllegalStateException("shard is not in primary mode")); + } return null; }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); doAnswer(invocation -> { From 799f73119c04d3f56d2b6d8c152583e11c7e4dc3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 21 May 2019 11:37:57 -0400 Subject: [PATCH 8/9] Add dedicated exception --- .../elasticsearch/ElasticsearchException.java | 7 +++- .../TransportReplicationAction.java | 3 +- .../elasticsearch/index/shard/IndexShard.java | 2 +- .../shard/ShardNotInPrimaryModeException.java | 36 +++++++++++++++++++ .../ExceptionSerializationTests.java | 2 ++ .../TransportReplicationActionTests.java | 7 ++-- .../index/shard/IndexShardTests.java | 10 +++--- 7 files changed, 57 insertions(+), 10 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/shard/ShardNotInPrimaryModeException.java diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 861228d221778..85df20d849afa 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1022,7 +1022,12 @@ private enum ElasticsearchExceptionHandle { org.elasticsearch.index.seqno.RetentionLeaseNotFoundException.class, org.elasticsearch.index.seqno.RetentionLeaseNotFoundException::new, 154, - Version.V_6_7_0); + Version.V_6_7_0), + SHARD_NOT_IN_PRIMARY_MODE_EXCEPTION( + org.elasticsearch.index.shard.ShardNotInPrimaryModeException.class, + org.elasticsearch.index.shard.ShardNotInPrimaryModeException::new, + 155, + Version.V_6_8_1); final Class exceptionClass; final CheckedFunction constructor; diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 1178c7ede2cde..d19009433deb5 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -63,6 +63,7 @@ import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.index.shard.ShardNotInPrimaryModeException; import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; @@ -313,7 +314,7 @@ protected void doRun() throws Exception { ActionListener.wrap( releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)), e -> { - if (e instanceof IllegalStateException && e.getMessage().equals("shard is not in primary mode")) { + if (e instanceof ShardNotInPrimaryModeException) { onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e)); } else { onFailure(e); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index db280a6cc6cf6..ba087c4220f97 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2522,7 +2522,7 @@ private ActionListener wrapPrimaryOperationPermitListener(final Acti (l, r) -> { if (replicationTracker.isPrimaryMode() == false) { r.close(); - l.onFailure(new IllegalStateException("shard is not in primary mode")); + l.onFailure(new ShardNotInPrimaryModeException(shardId, state)); } else { l.onResponse(r); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardNotInPrimaryModeException.java b/server/src/main/java/org/elasticsearch/index/shard/ShardNotInPrimaryModeException.java new file mode 100644 index 0000000000000..8bc23dcdd00f7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardNotInPrimaryModeException.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +public class ShardNotInPrimaryModeException extends IllegalIndexShardStateException { + + public ShardNotInPrimaryModeException(final ShardId shardId, final IndexShardState currentState) { + super(shardId, currentState, "shard is not in primary mode"); + } + + public ShardNotInPrimaryModeException(final StreamInput in) throws IOException { + super(in); + } + +} diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 1fac56886de45..a0aafbb41d371 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -66,6 +66,7 @@ import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardNotInPrimaryModeException; import org.elasticsearch.indices.IndexTemplateMissingException; import org.elasticsearch.indices.InvalidIndexTemplateException; import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException; @@ -816,6 +817,7 @@ public void testIds() { ids.put(152, NoSuchRemoteClusterException.class); ids.put(153, RetentionLeaseAlreadyExistsException.class); ids.put(154, RetentionLeaseNotFoundException.class); + ids.put(155, ShardNotInPrimaryModeException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 396b8429bf9ad..4459aa5556988 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -64,9 +64,11 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.index.shard.ShardNotInPrimaryModeException; import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -423,7 +425,7 @@ protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary, final ExecutionException e = expectThrows(ExecutionException.class, listener::get); assertThat(e.getCause(), instanceOf(ReplicationOperation.RetryOnPrimaryException.class)); assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); - assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause().getCause(), instanceOf(ShardNotInPrimaryModeException.class)); assertThat(e.getCause().getCause(), hasToString(containsString("shard is not in primary mode"))); } @@ -1310,6 +1312,7 @@ private IndexService mockIndexService(final IndexMetaData indexMetaData, Cluster private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) { final IndexShard indexShard = mock(IndexShard.class); when(indexShard.shardId()).thenReturn(shardId); + when(indexShard.state()).thenReturn(IndexShardState.STARTED); doAnswer(invocation -> { ActionListener callback = (ActionListener) invocation.getArguments()[0]; if (isPrimaryMode.get()) { @@ -1317,7 +1320,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService callback.onResponse(count::decrementAndGet); } else { - callback.onFailure(new IllegalStateException("shard is not in primary mode")); + callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED)); } return null; }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 396c6badc05ef..786d5bc5e8df8 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -688,7 +688,7 @@ public void onResponse(final Releasable releasable) { @Override public void onFailure(final Exception e) { - assertThat(e, instanceOf(IllegalStateException.class)); + assertThat(e, instanceOf(ShardNotInPrimaryModeException.class)); assertThat(e, hasToString(containsString("shard is not in primary mode"))); } }, @@ -705,7 +705,7 @@ public void onResponse(final Releasable releasable) { @Override public void onFailure(final Exception e) { - assertThat(e, instanceOf(IllegalStateException.class)); + assertThat(e, instanceOf(ShardNotInPrimaryModeException.class)); assertThat(e, hasToString(containsString("shard is not in primary mode"))); latch.countDown(); } @@ -1730,7 +1730,7 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { recoveryThread.join(); assertTrue(shard.isRelocatedPrimary()); final ExecutionException e = expectThrows(ExecutionException.class, () -> acquirePrimaryOperationPermitBlockingly(shard)); - assertThat(e.getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); closeShards(shard); @@ -1783,14 +1783,14 @@ public void onFailure(Exception e) { onLockAcquired = new PlainActionFuture<>(); assertion = () -> { final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); - assertThat(e.getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); }; } else { onLockAcquired = new PlainActionFuture<>(); assertion = () -> { final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS)); - assertThat(e.getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause(), instanceOf(ShardNotInPrimaryModeException.class)); assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode"))); }; } From f38d926ebd1f99baf93f5cbedddda9baefa821b8 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 21 May 2019 11:45:41 -0400 Subject: [PATCH 9/9] Remove == false --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ba087c4220f97..da5ee8f8363ff 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2520,11 +2520,11 @@ private ActionListener wrapPrimaryOperationPermitListener(final Acti return ActionListener.delegateFailure( listener, (l, r) -> { - if (replicationTracker.isPrimaryMode() == false) { + if (replicationTracker.isPrimaryMode()) { + l.onResponse(r); + } else { r.close(); l.onFailure(new ShardNotInPrimaryModeException(shardId, state)); - } else { - l.onResponse(r); } }); }