-
Notifications
You must be signed in to change notification settings - Fork 24.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Combine the execution of an exclusive replica operation with primary term update #36116
Combine the execution of an exclusive replica operation with primary term update #36116
Conversation
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there are more races here (needs a better test I think). Assume IndexShard is currently on term 1 and you have two consecutive invocations of acquireAllReplicaOperationsPermits
with term 2. The first invocation will bump the pendingPrimaryTerm to 2 and combine the execution of the all permit acquisition with the term bumping. The second invocation however will not do so, because pendingPrimaryTerm has been set to 2 at that point. The second invocation of asyncBlockOperations
will still race against the asyncBlockOperations
of the term bumping though. I think we need to combine these in slightly different ways and also need more tests that concurrently invoke things here.
void onPrimaryTermUpdate() throws Exception; | ||
|
||
@Override | ||
default void onResponse(final Releasable releasable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do this have a default implementation? It's never used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used when IndexShard.updateShardState()
calls bumpPrimaryTerm()
to update the term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I missed that there was a second caller of bumpPrimaryTerm that only implements onPrimaryTermUpdate
Thanks for your feedback @ywelsch, always helpful. At this stage I think we could simply always combine the primary term bump with the all permits operation execution when the operation's primary term is greater than the shard's pending primary term OR if the operation's primary term is greater than the shard's current primary term and this is a "all permits" operation. Then we rely on bumpPrimaryTerm() which already guards against concurrent primary term updates. What do you think? |
I pushed 1f875b0 to illustrate the previous comment. |
yeah, I had the same kind of thing in mind. Can you add more tests that concurrently bump terms (similar to the situation described above) and that would have possibly failed without your last patch? |
I updated in 587e074 the test added in 1f875b0 to make it more likely to fail. It tests the situation you described, multiple concurrent invocations of acquireAllReplicaOperationsPermits that require a term update. The tests fails 4 times out of 10 when my last change in IndexShard is not applied. I think this test is enough but let me know if you still want more tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a few more comments and suggestions. Thanks for the extra test.
void onPrimaryTermUpdate() throws Exception; | ||
|
||
@Override | ||
default void onResponse(final Releasable releasable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I missed that there was a second caller of bumpPrimaryTerm that only implements onPrimaryTermUpdate
} catch (final Exception e) { | ||
onFailure(e); | ||
} finally { | ||
if (success == false) { | ||
releaseOnce.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any particular reason to run release after the onFailure? With the try-with-resources, the release was done before running the catch block. I would prefer to keep the semantics like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was mostly a safeguard in case the listener throws an exception and didn't close the release itself.
I would prefer to keep the semantics like that.
+1
} | ||
listener.onResponse(releaseOnce::run); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think a failure in this method should also fail the shard. Even though we are combining two things here, the exception handling should be distinct for both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
final long globalCheckpoint = replica.getGlobalCheckpoint(); | ||
final long maxSeqNoOfUpdatesOrDeletes = replica.getMaxSeqNoOfUpdatesOrDeletes(); | ||
|
||
final int operations = scaledRandomIntBetween(10, 64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need that much concurrency for this to trigger?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes the test most likely to fail, at least on my workstation. But since Jenkins is usually more busy than my box I'll decrease the number of ops here.
@@ -2316,9 +2316,24 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> | |||
indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit()); | |||
} | |||
|
|||
private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked) { | |||
@FunctionalInterface | |||
private interface PrimaryTermUpdateListener extends ActionListener<Releasable> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the logic still quite convoluted with this interface. I wonder if it's easier to optionally add the ActionListener with which to combine the term bump into the bumpPrimaryTerm method. Here's my try at this. Let me know what you prefer.
diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index ad06d8449a0..d8299cab5a5 100644
--- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -48,6 +48,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Booleans;
+import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -548,7 +549,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} catch (final AlreadyClosedException e) {
// okay, the index was deleted
}
- });
+ }, null);
}
}
// set this last, once we finished updating all internal state.
@@ -2316,22 +2317,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
}
- @FunctionalInterface
- private interface PrimaryTermUpdateListener extends ActionListener<Releasable> {
-
- void onPrimaryTermUpdate() throws Exception;
-
- @Override
- default void onResponse(final Releasable releasable) {
- Releasables.close(releasable);
- }
-
- @Override
- default void onFailure(final Exception e) {
- }
- }
-
- private void bumpPrimaryTerm(final long newPrimaryTerm, final PrimaryTermUpdateListener listener) {
+ private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked,
+ @Nullable ActionListener<Releasable> combineWithAction) {
assert Thread.holdsLock(mutex);
assert newPrimaryTerm >= pendingPrimaryTerm;
assert operationPrimaryTerm <= pendingPrimaryTerm;
@@ -2339,19 +2326,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
@Override
public void onFailure(final Exception e) {
+ try {
+ innerFail(e);
+ } finally {
+ if (combineWithAction != null) {
+ combineWithAction.onFailure(e);
+ }
+ }
+ }
+
+ private void innerFail(final Exception e) {
try {
failShard("exception during primary term transition", e);
} catch (AlreadyClosedException ace) {
// ignore, shard is already closed
- } finally {
- listener.onFailure(e);
}
}
@Override
public void onResponse(final Releasable releasable) {
final RunOnce releaseOnce = new RunOnce(releasable::close);
- boolean success = false;
try {
assert operationPrimaryTerm <= pendingPrimaryTerm;
termUpdated.await();
@@ -2359,14 +2353,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// in the order submitted. We need to guard against another term bump
if (operationPrimaryTerm < newPrimaryTerm) {
operationPrimaryTerm = newPrimaryTerm;
- listener.onPrimaryTermUpdate();
+ onBlocked.run();
}
- listener.onResponse(releaseOnce::run);
- success = true;
} catch (final Exception e) {
- onFailure(e);
+ if (combineWithAction == null) {
+ // otherwise leave it to combineWithAction to release the permit
+ releaseOnce.run();
+ }
+ innerFail(e);
} finally {
- if (success == false) {
+ if (combineWithAction != null) {
+ combineWithAction.onResponse(releasable);
+ } else {
releaseOnce.run();
}
}
@@ -2480,37 +2478,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
throw new IndexShardNotStartedException(shardId, shardState);
}
- bumpPrimaryTerm(opPrimaryTerm, new PrimaryTermUpdateListener() {
- @Override
- public void onPrimaryTermUpdate() throws Exception {
- updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
- final long currentGlobalCheckpoint = getGlobalCheckpoint();
- final long maxSeqNo = seqNoStats().getMaxSeqNo();
- logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
- opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
- if (currentGlobalCheckpoint < maxSeqNo) {
- resetEngineToGlobalCheckpoint();
- } else {
- getEngine().rollTranslogGeneration();
- }
+ bumpPrimaryTerm(opPrimaryTerm, () -> {
+ updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
+ final long currentGlobalCheckpoint = getGlobalCheckpoint();
+ final long maxSeqNo = seqNoStats().getMaxSeqNo();
+ logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
+ opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
+ if (currentGlobalCheckpoint < maxSeqNo) {
+ resetEngineToGlobalCheckpoint();
+ } else {
+ getEngine().rollTranslogGeneration();
}
-
- @Override
- public void onResponse(final Releasable releasable) {
- if (allowCombineOperationWithPrimaryTermUpdate) {
- operationListener.onResponse(releasable);
- } else {
- Releasables.close(releasable);
- }
- }
-
- @Override
- public void onFailure(Exception e) {
- if (allowCombineOperationWithPrimaryTermUpdate) {
- operationListener.onFailure(e);
- }
- }
- });
+ }, allowCombineOperationWithPrimaryTermUpdate ? operationListener : null);
if (allowCombineOperationWithPrimaryTermUpdate) {
logger.debug("operation execution has been combined with primary term update");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is indeed much clearer, thanks. I integrated it as it is.
@ywelsch code updated again, can you have another look please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
assert Thread.holdsLock(mutex); | ||
assert newPrimaryTerm > pendingPrimaryTerm; | ||
assert newPrimaryTerm >= pendingPrimaryTerm; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe strengthen this assertion along the ways of
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's also more explicit
Thanks @ywelsch |
…term update (#36116) This commit changes how an operation which requires all index shard operations permits is executed when a primary term update is required: the operation and the update are combined so that the operation is executed after the primary term update under the same blocking operation. Closes #35850 Co-authored-by: Yannick Welsch <yannick@welsch.lu>
This pull request changes how an operation which requires all index shard operations permits is executed when a primary term update is required: the operation and the update are combined so that the operation is executed after the primary term update under the same blocking operation.
This change is a fix for #35850 after a suggestion from @ywelsch. It introduces a functional interface
PrimaryTermUpdateListener
that is also an ActionListener and that can be used to combine both operations (the primary term update and the operation logic) under the same listener.Closes #35850