Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block older operations on primary term transition #24779

Merged
merged 8 commits into from
May 19, 2017

Conversation

jasontedor
Copy link
Member

Today a replica learns of a new primary term via a cluster state update and there is not a clean transition between the older primary term and the newer primary term. This commit modifies this situation so that:

  • a replica shard learns of a new primary term via replication operations executed under the mandate of the new primary
  • when a replica shard learns of a new primary term, it blocks operations on older terms from reaching the engine, with a clear transition point between the operations on the older term and the operations on the newer term

This work paves the way for a primary/replica sync on primary promotion. Future work will also ensure a clean transition point on a promoted primary, and prepare a replica shard for a sync with the promoted primary.

Relates #10708

Today a replica learns of a new primary term via a cluster state update
and there is not a clean transition between the older primary term and
the newer primary term. This commit modifies this situation so that:
 - a replica shard learns of a new primary term via replication
   operations executed under the mandate of the new primary
 - when a replica shard learns of a new primary term, it blocks
   operations on older terms from reaching the engine, with a clear
   transition point between the operations on the older term and the
   operations on the newer term

This work paves the way for a primary/replica sync on primary
promotion. Future work will also ensure a clean transition point on a
promoted primary, and prepare a replica shard for a sync with the
promoted primary.
@jasontedor jasontedor force-pushed the block-party branch 2 times, most recently from 64bc8c3 to 857fcdb Compare May 19, 2017 02:09
throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])",
shardId, opPrimaryTerm, primaryTerm));
if (operationPrimaryTerm > primaryTerm
&& pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are many incoming operations with higher term, each one of them will go into this branch and invoke blockOperations (until one completes). This can create additional contention when the first blockOperations is completed and subsequent operations unnecessarily call blockOperations. I've adapted your code in 7ff4a7c so that only the first operation with higher term calls blockOperations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution has the problem that if, primaryTerm == 0, an operation comes in with operationPrimaryTerm == 1 then another operation comes in with operationPrimaryTerm == 2 and then another ops comes in with operationPrimaryTerm == 1, it maybe be that the last op is processed before the primaryTerm was incremented to 1 (or 2). This can happen if the first ops passed the check but didn't submit it's block. The 2 op incremented pendingPrimaryTerm but didn't submit the block and then the 3rd op just passes this along without waiting.

Copy link
Contributor

@bleskes bleskes May 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and now I see that we guard against it with if (operationPrimaryTerm == currentPrimaryTerm) later on, so the third operation will be failed but with the wrong message (we will say it's too old and give a currentPrimaryTerm of 0 while the ops term is 1). I think is all just too complex and isn't worth it given how rare primary promotions are.

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. I think the main discussion point is the concurrency control. I will reach out to discuss in another channel.

@@ -180,7 +178,7 @@ protected void resolveRequest(final IndexMetaData indexMetaData, final Request r

/**
* Synchronous replica operation on nodes with replica copies. This is done under the lock form
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is done while having (under?) a permit

&& pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) {
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
if (operationPrimaryTerm > primaryTerm) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment as to how it's possible that the term will not be higher (i.e. race condition between checking pendingPrimaryTerm and submitting the blockOperations

throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])",
shardId, opPrimaryTerm, primaryTerm));
if (operationPrimaryTerm > primaryTerm
&& pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution has the problem that if, primaryTerm == 0, an operation comes in with operationPrimaryTerm == 1 then another operation comes in with operationPrimaryTerm == 2 and then another ops comes in with operationPrimaryTerm == 1, it maybe be that the last op is processed before the primaryTerm was incremented to 1 (or 2). This can happen if the first ops passed the check but didn't submit it's block. The 2 op incremented pendingPrimaryTerm but didn't submit the block and then the 3rd op just passes this along without waiting.

throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])",
shardId, opPrimaryTerm, primaryTerm));
if (operationPrimaryTerm > primaryTerm
&& pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) {
Copy link
Contributor

@bleskes bleskes May 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and now I see that we guard against it with if (operationPrimaryTerm == currentPrimaryTerm) later on, so the third operation will be failed but with the wrong message (we will say it's too old and give a currentPrimaryTerm of 0 while the ops term is 1). I think is all just too complex and isn't worth it given how rare primary promotions are.

public void onResponse(final Releasable releasable) {
assert operationPrimaryTerm <= primaryTerm
: "operation primary term [" + operationPrimaryTerm + "] should be at most [" + primaryTerm + "]";
if (operationPrimaryTerm < primaryTerm) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment please on how this can happen...

*/
public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked) throws InterruptedException, TimeoutException {
if (closed) {
throw new IndexShardClosedException(shardId);
}
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
assert semaphore.availablePermits() == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

ensureYellow();

// this forces the primary term to propagate to the replicas
client().index(new IndexRequest("test", "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON)).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we make sure we change it/only do it sometimes once we can?

@@ -561,6 +561,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes);
final Set<String> initializingIds =
allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes);
shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should move this if clause to before update the routing entry.. @ywelsch this class is your baby, any thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this separately and proceed in a follow-up if needed.

ThreadPool.Names.INDEX);
};

final Thread first = new Thread(function.apply(randomBoolean()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we lock down the expected end term based on these booleans and assert for that?

@Override
public void onResponse(Releasable releasable) {
counter.incrementAndGet();
latch.countDown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a check on the term here?

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @jasontedor

@jasontedor jasontedor merged commit 4cd70cf into elastic:master May 19, 2017
@jasontedor jasontedor deleted the block-party branch May 19, 2017 20:18
bleskes added a commit that referenced this pull request May 24, 2017
With #24779 in place, we can now guaranteed that a single translog generation file will never have a sequence number conflict that needs to be resolved by looking at primary terms. These conflicts can a occur when a replica contains an operation which isn't part of the history of a newly promoted primary. That primary can then assign a different operation to the same slot and replicate it to the replica.

PS. Knowing that each generation file is conflict free will simplifying repairing these conflicts when we read from the translog.

PPS. This PR also fixes some bugs in the piping of primary terms in the bulk shard action. These bugs are a result of the legacy of IndexRequest/DeleteRequest being a ReplicationRequest. We need to change that as a follow up.

Relates to #10708
@clintongormley clintongormley added the :Distributed/Engine Anything around managing Lucene and the Translog in an open shard. label Feb 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Engine Anything around managing Lucene and the Translog in an open shard. >enhancement v6.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants