Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guarantee that translog generations are seqNo conflict free #24825

Merged
merged 19 commits into from
May 24, 2017

Conversation

bleskes
Copy link
Contributor

@bleskes bleskes commented May 22, 2017

With #24779 in place, we can now guaranteed that a single translog generation file will never have a sequence number conflict that needs to be resolved by looking at primary terms. These conflicts can a occur when a replica contains an operation which isn't part of the history of a newly promoted primary. That primary can then assign a different operation to the same slot and replicate it to the replica.

PS. Knowing that each generation file is conflict free will simplifying repairing these conflicts when we read from the translog.

PPS. This PR also fixes some bugs in the piping of primary terms in the bulk shard action. These bugs are a result of the legacy of IndexRequest/DeleteRequest being a ReplicationRequest. We need to change that as a follow up.

Relates to #10708

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some minors LGTM in general

@@ -1260,9 +1263,16 @@ public int hashCode() {
return 31 * 31 * 31 + 31 * 31 * Long.hashCode(seqNo) + 31 * Long.hashCode(primaryTerm) + reason().hashCode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this hashcode looks odd....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. I generated a new one.

@@ -90,6 +96,13 @@ private TranslogWriter(
assert initialCheckpoint.maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo;
this.maxSeqNo = initialCheckpoint.maxSeqNo;
this.globalCheckpointSupplier = globalCheckpointSupplier;
boolean assertionsEnabled = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert (seenSequenceNumbers = new HashMap<>()) != null; if you wanna have a one-liner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then I think I can't make seenSequenceNumbers final which is a shame...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please integrate #24834 and use it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

integrated. Nice work.

DocWriteResponse primaryResponse,
IndexRequest request,
IndexShard replica) throws IOException {
DocWriteResponse primaryResponse,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you maybe un-indent this into a single line?

try {
verifyReplicationTarget();
assert primaryTerm == this.primaryTerm : "op term [ " + primaryTerm + " ] != shard term [" + this.primaryTerm + "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a hard exception? something is bloody wrong here if this happens? maybe IllegalStateException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is an interesting one - yes things are totally fucked up. I'm torn as to whether to change it - with assertions nodes die / test fail with the right exception as we never catch them. An IlegalStateException will cause the shard to be failed and then the failure may cascade further. I'm tempted to keep as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be kept as is exactly for the reason that @bleskes mentions regarding the consequences in production code of changing this to a hard failure and the possible loss of visibility when tests are running. This really should never happen.

@@ -597,6 +598,7 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long primaryTerm,
long version, VersionType versionType) {
verifyReplicationTarget();
assert primaryTerm == this.primaryTerm : "op term [ " + primaryTerm + " ] != shard term [" + this.primaryTerm + "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here?

@@ -1879,8 +1881,9 @@ public void acquireReplicaOperationPermit(
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
assert operationPrimaryTerm > primaryTerm;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a message to this assertion while you are at it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

return new Translog.Location(generation, offset, data.length());
}

private boolean assertSeqNoNotSeen(long seqNo, BytesReference data) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's only called in one place but can you make it synchronized just for readability?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left minor comments around primary term but LGTM o.w.

@@ -1257,12 +1260,22 @@ public boolean equals(Object obj) {

@Override
public int hashCode() {
return 31 * 31 * 31 + 31 * 31 * Long.hashCode(seqNo) + 31 * Long.hashCode(primaryTerm) + reason().hashCode();
int result = (int) (seqNo ^ (seqNo >>> 32));
result = 31 * result + (int) (primaryTerm ^ (primaryTerm >>> 32));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result = 31 * result + Long.hashCode(primaryTerm); is simpler (id. for seqNo above)

another alternative would be to use Objects.hashCode(seqNo, primaryTerm, reason)

either way, we can avoid the bit-fiddling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just let IntelliJ autogen it.

});
} catch (final InterruptedException | TimeoutException e) {
} catch (final InterruptedException | TimeoutException | IOException | AlreadyClosedException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just catch (Exception)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this just got out of hand.

@@ -522,10 +522,11 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
}
}

public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long version, VersionType versionType, long autoGeneratedIdTimestamp,
boolean isRetry) {
public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long primaryTerm, long version, VersionType versionType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling this just primaryTerm is confusing (in light of a future PR that uses this code during resync). Here it means the term of the primary that was sending this operation, not necessarily the primary term of the log entry that is being replicated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it means the term of the primary that was sending this operation, not necessarily the primary term of the log entry that is being replicated

Actually it's the term of the op in the log and not the authority of the primary that sends this op (that one gause into the permit method). I guess this proves your point about being confusing. Any suggestion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spoke to Yannick on a different channel. We decided to rename the parameter name opPrimaryTerm and relax the assertion to reflect semantics (as opposed to current usage).

@@ -667,7 +669,7 @@ static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest r
final long version = primaryResponse.getVersion();
assert versionType.validateVersionForWrites(version);
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
primaryResponse.getSeqNo(), request.primaryTerm(), version, versionType);
primaryResponse.getSeqNo(), primaryTerm, version, versionType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's trappy that the IndexRequest/DeleteRequest object might not have a primaryTerm properly set if it's wrapped in a BulkShardRequest. Maybe we could override BulkShardRequest.primaryTerm(long) to set the primary term of the inner objects until we have fixed this discrepancy?

Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments.

return 31 * 31 * 31 + 31 * 31 * Long.hashCode(seqNo) + 31 * Long.hashCode(primaryTerm) + reason().hashCode();
int result = (int) (seqNo ^ (seqNo >>> 32));
result = 31 * result + (int) (primaryTerm ^ (primaryTerm >>> 32));
result = 31 * result + reason.hashCode();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this? It was perfectly fine as it was, and far more readable. Note that the previous calculation was simply the result of expanding the usual hash code calculation:

accumulator = 1
for (o in objects) {
  accumulator = 31 * accumulator + hash(o)
}
return accumulator

So you end up with 31^k + 31^(k - 1) * hash_1 + 31^(k - 2) * hash_2 + ... 31^0 * hash_k hence why it looks like 31^3 + ...

Copy link
Contributor Author

@bleskes bleskes May 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can roll it back. I just moved it to be the standard autogen-ed intellij code, which is what we see all over the place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just going to roll this back. It's not related to my changed and it seems it's contentious.

if (seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
// nothing to do
} else if (seenSequenceNumbers.containsKey(seqNo)) {
final Tuple<BytesReference, Exception> previous = seenSequenceNumbers.get(seqNo);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not StackTraceElement[]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way so it can easily integrate into a caused by clause and have a proper message. Do you have a better suggestion there?

}
} else {
seenSequenceNumbers.put(seqNo,
new Tuple<>(new BytesArray(data.toBytesRef(), true), new RuntimeException("stack capture previous op")));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not Thread.currentThread().getStackTrace()?

@@ -90,6 +96,13 @@ private TranslogWriter(
assert initialCheckpoint.maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED : initialCheckpoint.maxSeqNo;
this.maxSeqNo = initialCheckpoint.maxSeqNo;
this.globalCheckpointSupplier = globalCheckpointSupplier;
boolean assertionsEnabled = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please integrate #24834 and use it here.

@@ -522,10 +522,11 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
}
}

public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long version, VersionType versionType, long autoGeneratedIdTimestamp,
boolean isRetry) {
public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long primaryTerm, long version, VersionType versionType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

try {
verifyReplicationTarget();
assert primaryTerm == this.primaryTerm : "op term [ " + primaryTerm + " ] != shard term [" + this.primaryTerm + "]";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be kept as is exactly for the reason that @bleskes mentions regarding the consequences in production code of changing this to a hard failure and the possible loss of visibility when tests are running. This really should never happen.

Copy link
Member

@jasontedor jasontedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left another comment.

DocWriteResponse primaryResponse,
IndexRequest request,
IndexShard replica) throws IOException {
private static Engine.IndexResult executeIndexRequestOnReplica(DocWriteResponse primaryResponse, IndexRequest request,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A reason that I prefer the line-per-parameter form for methods definitions that spill over multiple lines is because had you maintained it in this case the diff would simply be:

--- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -531,6 +531,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
     private static Engine.IndexResult executeIndexRequestOnReplica(
             DocWriteResponse primaryResponse,
             IndexRequest request,
+            long primaryTerm,
             IndexShard replica) throws IOException {
 
         final Engine.Index operation;

and git blame would trace that additional parameter back to this PR rather than the entire method declaration.

@bleskes
Copy link
Contributor Author

bleskes commented May 24, 2017

retest this please.

@bleskes bleskes merged commit 6bc5b1d into elastic:master May 24, 2017
@bleskes bleskes deleted the translog_assert_seqno branch May 24, 2017 11:26
@bleskes
Copy link
Contributor Author

bleskes commented May 24, 2017

thanks @s1monw @ywelsch @jasontedor

@clintongormley clintongormley added :Distributed/Distributed A catch all label for anything in the Distributed Area. If you aren't sure, use this one. :Distributed/Engine Anything around managing Lucene and the Translog in an open shard. and removed :Translog :Distributed/Distributed A catch all label for anything in the Distributed Area. If you aren't sure, use this one. labels Feb 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Engine Anything around managing Lucene and the Translog in an open shard. >enhancement v6.0.0-alpha2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants