Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequence numbers commit data for Lucene uses Iterable interface #20793

Merged
merged 10 commits into from
Oct 12, 2016

Conversation

abeyad
Copy link

@abeyad abeyad commented Oct 7, 2016

Sequence number related data (maximum sequence number, local checkpoint,
and global checkpoint) gets stored in Lucene on each commit. The logical
place to store this data is on each Lucene commit's user commit data
structure (see IndexWriter#setCommitData and the new version
IndexWriter#setLiveCommitData). However, previously we did not store the
maximum sequence number in the commit data because the commit data got
copied over before the Lucene IndexWriter flushed the documents to segments
in the commit. This meant that between the time that the commit data was
set on the IndexWriter and the time that the IndexWriter completes the commit,
documents with higher sequence numbers could have entered the commit.
Hence, we would use FieldStats on the _seq_no field in the documents to get
the maximum sequence number value, but this suffers the drawback that if the
last sequence number in the commit corresponded to a delete document action,
that sequence number would not show up in FieldStats as there would be no
corresponding document in Lucene.

In Lucene 6.2, commit data was changed to take an Iterable interface, so
that the commit data can be calculated and retrieved after all documents
have been flushed. This commit changes max_seq_no so it is stored in the
commit data instead of being calculated from FieldStats, taking advantage of
the deferred calculation of the max_seq_no through passing an Iterable that
dynamically sets the iterator data.

Relates #10708

Sequence number related data (maximum sequence number, local checkpoint,
and global checkpoint) gets stored in Lucene on each commit. The logical
place to store this data is on each Lucene commit's user commit data
structure (see IndexWriter#setCommitData and the new version
IndexWriter#setLiveCommitData). However, previously we did not store the
maximum sequence number in the commit data because the commit data got
copied over before the Lucene IndexWriter flushed the documents to segments
in the commit.  This means that between the time that the commit data was
set on the IndexWriter and the time that the IndexWriter completes the commit,
documents with higher sequence numbers could have entered the commit.
Hence, we would use FieldStats on the _seq_no field in the documents to get
the maximum sequence number value, but this suffers the drawback that if the
last sequence number in the commit corresponded to a delete document action,
that sequence number would not show up in FieldStats as there would be no
corresponding document in Lucene.

In Lucene 6.2, the commit data was changed to take an Iterable interface, so
that the commit data can be calculated and retrieved *after* all documents
have been flushed, while the commit data itself is being set on the Lucene commit.
This commit changes max_seq_no so it is stored in the commit data instead of
being calculated from FieldStats, taking advantage of the deferred calculation
of the max_seq_no through passing an Iterable that dynamically sets the iterator
data.
Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx @abeyad . I left some minor comments. Can we increase the test to cover delete operations?

* all documents, we defer computation of the max_seq_no to the time of invocation of the commit
* data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> deferredCommitData = new HashMap<>(commitData.size() + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be simpler to just capture the local and global checkpoints before we start and build one map here. This way we don't need two maps.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -1336,10 +1321,26 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn
}

if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
logger.trace("committing writer with commit data (max_seq_no excluded) [{}]", commitData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we log this after the commit so we have everything?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, in doing this, I noticed there is an issue in how the commit data is set - anytime the iterator() is called, it will recompute the maxSeqNo based on the current value of what the SequenceNumbersService returns. We only want this done once so each subsequent time we call writer.getLiveCommitData(), we get the same value that went into the commit. I'm fixing this and will push up a new commit

@abeyad
Copy link
Author

abeyad commented Oct 7, 2016

@bleskes I pushed 1d63334 to address your review comments on how to construct the commit data, and to ensure its safe access on subsequent calls to its iterator. I pushed d396c7e to add document deletion to the tests.

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a minor suggestion. Also, I think it will be great to have a test that concurrently indexes and commits and makes sure that with every commit point, all ops below the local checkpoint are present and no ops about the max_seq is present. Feel free to add this test here or in a follow up change.

writer.setLiveCommitData(new Iterable<Map.Entry<String, String>>() {
// save the max seq no the first time its computed, so subsequent iterations don't recompute,
// potentially getting a different value
private String computedMaxSeqNoEntry = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like we need this cashing because we load user data from the index writer. maybe we can use lastCommittedSegmentInfos, which can be read earlier when opening the engine?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes I agree with using lastCommittedSegmentInfos from the engine as a solution, but my concern here was that basically we have to document and ensure that no one uses IndexWriter#getLiveCommitData or depends on it for accurate information, otherwise the max_seq_no could be different than what is actually stored in the commit. That's why I added the caching part, which does increase complexity. Do you prefer I remove it and just document that we should never call IndexWriter#getLiveCommitData inside ES code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK to not use IndexWriter#getLiveCommitData - as it may not return what's in the last commit. I suspect this is why it was renamed to say live in the name. Is there anything specific you are concerned about?

Ali Beyad added 2 commits October 10, 2016 10:07
to Lucene, ensuring the sequence number related commit data
in each Lucene commit point matches the invariants of
localCheckpoint <= highest sequence number in commit <= maxSeqNo
@abeyad
Copy link
Author

abeyad commented Oct 12, 2016

@bleskes I pushed 2656776 to remove caching from the iterator and 27532b1 adds a concurrent writes/commit test (and fixes another test)

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx @abeyad . This looks great. I left some minor comments around testing.

}
commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committed writer with commit data [{}]", commitData);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be "commit_ting_ writer with commit data"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
initialEngine.index(index);
final String id;
boolean versionConflict = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

initialEngine.seqNoService().getMaxSeqNo(),
// its possible we haven't indexed any documents yet, or its possible that right after a commit, a version conflict
// exception happened so the max seq no was not updated, so here we check greater than or equal to
initialEngine.seqNoService().getMaxSeqNo() != SequenceNumbersService.NO_OPS_PERFORMED || versionConflict ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need this extra check? what were you aiming at testing ? feels like it's for the time we had caching?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't asserting anything relevant about the saved commit data any longer, so I'm removing it

assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
equalTo(primarySeqNo));
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)),
equalTo(globalCheckpoint));
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)),
// after recovering from translog, all docs have been flushed to Lucene segments, so check against primarySeqNo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I follow this comment? can you elaborate?

@@ -1695,6 +1766,118 @@ public void testSeqNoAndCheckpoints() throws IOException {
}
}

// this test writes documents to the engine while concurrently flushing/commit
// and ensuring that the commit points contain the correct sequence number data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++.. thx!

} catch (Exception e) {
throw new RuntimeException(e);
} finally {
threadStatuses.get(threadIdx).set(true); // signal that this thread is done indexing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just use thread.alive() here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, done

IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {

final int numIndexingThreads = randomIntBetween(4, 7);
final int numDocsPerThread = randomIntBetween(500, 1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how long does this test run? wondering if we should make it lighter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Average run is about 2.2 seconds on my Mac. I agree, could be lighter.. but dropping to randomIntBetween(100,500) reduces the execution time to 1.95 secs on average. Not sure if its worth going lower than that as we likely won't often get useful commits to check against (all docs would go in a single commit).

long maxSeqNo = userData.containsKey(InternalEngine.MAX_SEQ_NO) ?
Long.parseLong(userData.get(InternalEngine.MAX_SEQ_NO)) :
SequenceNumbersService.UNASSIGNED_SEQ_NO;
assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint)) ; // local checkpoint shouldn't go backwards
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

show we have the same for maxSeqNo?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

final Bits bits = leaf.getLiveDocs();
for (int docID = 0; docID < leaf.maxDoc(); docID++) {
if (bits == null || bits.get(docID)) {
bitSet.set((int) values.get(docID));
Copy link
Contributor

@bleskes bleskes Oct 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we check this is not set already?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@abeyad
Copy link
Author

abeyad commented Oct 12, 2016

thanks for the review @bleskes !

@abeyad abeyad merged commit 7c2e761 into elastic:feature/seq_no Oct 12, 2016
@abeyad abeyad deleted the deferred_commit_data branch October 12, 2016 16:38
@clintongormley clintongormley added :Engine :Distributed/Engine Anything around managing Lucene and the Translog in an open shard. and removed :Sequence IDs labels Feb 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Engine Anything around managing Lucene and the Translog in an open shard. >enhancement v6.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants