Skip to content

Commit

Permalink
CCR should auto-retry rejected execution exceptions (elastic#49213)
Browse files Browse the repository at this point in the history
If CCR encounters a rejected execution exception, today we treat this as
fatal. This is not though, as the stuffed queue could drain. Requiring
an administrator to manually restart the follow tasks that faced such an
exception is a burden. This commit addresses this by making CCR
auto-retry on rejected execution exceptions.
  • Loading branch information
jasontedor committed Nov 17, 2019
1 parent 5395782 commit d88965a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -455,7 +456,7 @@ private void updateSettings(final LongConsumer handler, final AtomicInteger retr

private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
assert e != null;
if (shouldRetry(params.getRemoteCluster(), e)) {
if (shouldRetry(e)) {
if (isStopped() == false) {
// Only retry is the shard follow task is not stopped.
int currentRetry = retryCounter.incrementAndGet();
Expand Down Expand Up @@ -484,7 +485,7 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
return Math.min(backOffDelay, maxRetryDelayInMillis);
}

static boolean shouldRetry(String remoteCluster, Exception e) {
static boolean shouldRetry(final Exception e) {
if (NetworkExceptionHelper.isConnectException(e)) {
return true;
} else if (NetworkExceptionHelper.isCloseConnectionException(e)) {
Expand All @@ -503,7 +504,8 @@ static boolean shouldRetry(String remoteCluster, Exception e) {
actual instanceof ConnectTransportException ||
actual instanceof NodeClosedException ||
actual instanceof NoSuchRemoteClusterException ||
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed"));
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) ||
actual instanceof EsRejectedExecutionException;
}

// These methods are protected for testing purposes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll
return;
}

if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) {
if (ShardFollowNodeTask.shouldRetry(e)) {
logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number",
shardFollowNodeTask), e);
threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
Expand Down Expand Up @@ -255,8 +256,16 @@ public void testReceiveRetryableError() {
startTask(task, 63, -1);

int max = randomIntBetween(1, 30);
final Exception[] exceptions = new Exception[max];
for (int i = 0; i < max; i++) {
readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
final Exception exception;
if (randomBoolean()) {
exception = new ShardNotFoundException(new ShardId("leader_index", "", 0));
} else {
exception = new EsRejectedExecutionException("leader_index rejected");
}
exceptions[i] = exception;
readFailures.add(exception);
}
mappingVersions.add(1L);
leaderGlobalCheckpoints.add(63L);
Expand All @@ -272,10 +281,17 @@ public void testReceiveRetryableError() {
final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry = status.readExceptions().entrySet().iterator().next();
assertThat(entry.getValue().v1(), equalTo(Math.toIntExact(retryCounter.get())));
assertThat(entry.getKey(), equalTo(0L));
assertThat(entry.getValue().v2(), instanceOf(ShardNotFoundException.class));
final ShardNotFoundException shardNotFoundException = (ShardNotFoundException) entry.getValue().v2();
assertThat(shardNotFoundException.getShardId().getIndexName(), equalTo("leader_index"));
assertThat(shardNotFoundException.getShardId().getId(), equalTo(0));
if (exceptions[Math.toIntExact(retryCounter.get()) - 1] instanceof ShardNotFoundException) {
assertThat(entry.getValue().v2(), instanceOf(ShardNotFoundException.class));
final ShardNotFoundException shardNotFoundException = (ShardNotFoundException) entry.getValue().v2();
assertThat(shardNotFoundException.getShardId().getIndexName(), equalTo("leader_index"));
assertThat(shardNotFoundException.getShardId().getId(), equalTo(0));
} else {
assertThat(entry.getValue().v2().getCause(), instanceOf(EsRejectedExecutionException.class));
final EsRejectedExecutionException rejectedExecutionException =
(EsRejectedExecutionException) entry.getValue().v2().getCause();
assertThat(rejectedExecutionException.getMessage(), equalTo("leader_index rejected"));
}
}
retryCounter.incrementAndGet();
};
Expand Down

0 comments on commit d88965a

Please sign in to comment.