diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c5f46b6815974..aa282296f844f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -43,7 +43,6 @@ import org.apache.lucene.util.InfoStream; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; @@ -59,7 +58,6 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.SeqNoStats; @@ -86,8 +84,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; -import static org.elasticsearch.index.seqno.SequenceNumbersService.NO_OPS_PERFORMED; - public class InternalEngine extends Engine { /** @@ -121,6 +117,7 @@ public class InternalEngine extends Engine { private final SequenceNumbersService seqNoService; static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; static final String GLOBAL_CHECKPOINT_KEY = "global_checkpoint"; + static final String MAX_SEQ_NO = "max_seq_no"; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling @@ -285,7 +282,7 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) thr boolean success = false; try { commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG - ? writer.getCommitData().get(SYNC_COMMIT_ID) : null); + ? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null); success = true; } finally { if (success == false) { @@ -310,7 +307,7 @@ public Translog getTranslog() { private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) throws IOException { // commit on a just opened writer will commit even if there are no changes done to it // we rely on that for the commit data translog id key - final Map commitUserData = writer.getCommitData(); + final Map commitUserData = commitDataAsMap(writer); if (commitUserData.containsKey("translog_id")) { assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false : "legacy commit contains translog UUID"; return new Translog.TranslogGeneration(null, Long.parseLong(commitUserData.get("translog_id"))); @@ -326,32 +323,20 @@ private Translog.TranslogGeneration loadTranslogIdFromCommit(IndexWriter writer) } private SeqNoStats loadSeqNoStatsFromCommit(IndexWriter writer) throws IOException { - final long maxSeqNo; - try (IndexReader reader = DirectoryReader.open(writer)) { - final FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader); - if (stats != null) { - maxSeqNo = (long) stats.getMaxValue(); - } else { - maxSeqNo = NO_OPS_PERFORMED; + long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + for (Map.Entry entry : writer.getLiveCommitData()) { + final String key = entry.getKey(); + if (key.equals(LOCAL_CHECKPOINT_KEY)) { + localCheckpoint = Long.parseLong(entry.getValue()); + } else if (key.equals(GLOBAL_CHECKPOINT_KEY)) { + globalCheckpoint = Long.parseLong(entry.getValue()); + } else if (key.equals(MAX_SEQ_NO)) { + maxSeqNo = Long.parseLong(entry.getValue()); } } - final Map commitUserData = writer.getCommitData(); - - final long localCheckpoint; - if (commitUserData.containsKey(LOCAL_CHECKPOINT_KEY)) { - localCheckpoint = Long.parseLong(commitUserData.get(LOCAL_CHECKPOINT_KEY)); - } else { - localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; - } - - final long globalCheckpoint; - if (commitUserData.containsKey(GLOBAL_CHECKPOINT_KEY)) { - globalCheckpoint = Long.parseLong(commitUserData.get(GLOBAL_CHECKPOINT_KEY)); - } else { - globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; - } - return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); } @@ -1323,23 +1308,39 @@ private void commitIndexWriter(IndexWriter writer, Translog translog, String syn ensureCanFlush(); try { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); - final Map commitData = new HashMap<>(5); - - commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration)); - commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID); - - commitData.put(LOCAL_CHECKPOINT_KEY, Long.toString(seqNoService().getLocalCheckpoint())); - commitData.put(GLOBAL_CHECKPOINT_KEY, Long.toString(seqNoService().getGlobalCheckpoint())); - - if (syncId != null) { - commitData.put(Engine.SYNC_COMMIT_ID, syncId); - } - if (logger.isTraceEnabled()) { - logger.trace("committing writer with commit data [{}]", commitData); - } + final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration); + final String translogUUID = translogGeneration.translogUUID; + final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint()); + final String globalCheckpoint = Long.toString(seqNoService().getGlobalCheckpoint()); + + writer.setLiveCommitData(() -> { + /** + * The user data captured above (e.g. local/global checkpoints) contains data that must be evaluated + * *before* Lucene flushes segments, including the local and global checkpoints amongst other values. + * The maximum sequence number is different - we never want the maximum sequence number to be + * less than the last sequence number to go into a Lucene commit, otherwise we run the risk + * of re-using a sequence number for two different documents when restoring from this commit + * point and subsequently writing new documents to the index. Since we only know which Lucene + * documents made it into the final commit after the {@link IndexWriter#commit()} call flushes + * all documents, we defer computation of the max_seq_no to the time of invocation of the commit + * data iterator (which occurs after all documents have been flushed to Lucene). + */ + final Map commitData = new HashMap<>(6); + commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen); + commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); + commitData.put(LOCAL_CHECKPOINT_KEY, localCheckpoint); + commitData.put(GLOBAL_CHECKPOINT_KEY, globalCheckpoint); + if (syncId != null) { + commitData.put(Engine.SYNC_COMMIT_ID, syncId); + } + commitData.put(MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); + if (logger.isTraceEnabled()) { + logger.trace("committing writer with commit data [{}]", commitData); + } + return commitData.entrySet().iterator(); + }); - indexWriter.setCommitData(commitData); writer.commit(); } catch (Exception ex) { try { @@ -1395,7 +1396,8 @@ public MergeStats getMergeStats() { public SequenceNumbersService seqNoService() { return seqNoService; } - @Override + + @Override public DocsStats getDocStats() { final int numDocs = indexWriter.numDocs(); final int maxDoc = indexWriter.maxDoc(); @@ -1441,4 +1443,15 @@ boolean indexWriterHasDeletions() { public boolean isRecovering() { return pendingTranslogRecovery.get(); } + + /** + * Gets the commit data from {@link IndexWriter} as a map. + */ + private static Map commitDataAsMap(final IndexWriter indexWriter) { + Map commitData = new HashMap<>(6); + for (Map.Entry entry : indexWriter.getLiveCommitData()) { + commitData.put(entry.getKey(), entry.getValue()); + } + return commitData; + } } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 43652a4f162c7..9e353f1899010 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -81,6 +81,13 @@ public long generateSeqNo() { return localCheckpointService.generateSeqNo(); } + /** + * Gets the maximum sequence number seen so far. See {@link LocalCheckpointService#getMaxSeqNo()} for details. + */ + public long getMaxSeqNo() { + return localCheckpointService.getMaxSeqNo(); + } + /** * marks the given seqNo as completed. See {@link LocalCheckpointService#markSeqNoAsCompleted(long)} * more details diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5f09453987d85..857a6c6546858 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -32,13 +32,19 @@ import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.LogDocMergePolicy; import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NoDeletionPolicy; import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; @@ -51,10 +57,13 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -87,6 +96,7 @@ import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.UidFieldMapper; +import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.IndexSearcherWrapper; @@ -124,6 +134,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -138,7 +149,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -295,6 +308,13 @@ protected InternalEngine createEngine(IndexSettings indexSettings, Store store, public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) { + return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), + maxUnsafeAutoIdTimestamp, refreshListener); + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + SnapshotDeletionPolicy deletionPolicy, long maxUnsafeAutoIdTimestamp, + ReferenceManager.RefreshListener refreshListener) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); final EngineConfig.OpenMode openMode; @@ -313,7 +333,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { // we don't need to notify anybody in this test } }; - EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, createSnapshotDeletionPolicy(), + EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, deletionPolicy, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener, @@ -583,6 +603,10 @@ public SequenceNumbersService seqNoService() { assertThat( Long.parseLong(stats1.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(SequenceNumbersService.UNASSIGNED_SEQ_NO)); + assertThat(stats1.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO)); + assertThat( + Long.parseLong(stats1.getUserData().get(InternalEngine.MAX_SEQ_NO)), + equalTo(SequenceNumbersService.NO_OPS_PERFORMED)); maxSeqNo.set(rarely() ? SequenceNumbersService.NO_OPS_PERFORMED : randomIntBetween(0, 1024)); localCheckpoint.set( @@ -608,6 +632,8 @@ public SequenceNumbersService seqNoService() { assertThat( Long.parseLong(stats2.getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(globalCheckpoint.get())); + assertThat(stats2.getUserData(), hasKey(InternalEngine.MAX_SEQ_NO)); + assertThat(Long.parseLong(stats2.getUserData().get(InternalEngine.MAX_SEQ_NO)), equalTo(maxSeqNo.get())); } finally { IOUtils.close(engine); } @@ -1618,13 +1644,14 @@ public void testIndexWriterInfoStream() throws IllegalAccessException { } public void testSeqNoAndCheckpoints() throws IOException { - // nocommit: does not test deletes final int opCount = randomIntBetween(1, 256); long primarySeqNo = SequenceNumbersService.NO_OPS_PERFORMED; final String[] ids = new String[]{"1", "2", "3"}; + final Set indexedIds = new HashSet<>(); long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; long replicaLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; + long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; InternalEngine initialEngine = null; try { @@ -1633,17 +1660,38 @@ public void testSeqNoAndCheckpoints() throws IOException { .seqNoService() .updateAllocationIdsFromMaster(new HashSet<>(Arrays.asList("primary", "replica")), Collections.emptySet()); for (int op = 0; op < opCount; op++) { - final String id = randomFrom(ids); - ParsedDocument doc = testParsedDocument("test#" + id, id, "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null); - final Engine.Index index = new Engine.Index(newUid("test#" + id), doc, - SequenceNumbersService.UNASSIGNED_SEQ_NO, - rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, - PRIMARY, 0, -1, false); - try { - initialEngine.index(index); + final String id; + boolean versionConflict = false; + // mostly index, sometimes delete + if (rarely() && indexedIds.isEmpty() == false) { + // we have some docs indexed, so delete one of them + id = randomFrom(indexedIds); + final Engine.Delete delete = new Engine.Delete( + "test", id, newUid("test#" + id), SequenceNumbersService.UNASSIGNED_SEQ_NO, + rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0, false); + try { + initialEngine.delete(delete); + indexedIds.remove(id); + } catch (VersionConflictEngineException e) { + versionConflict = true; + } + } else { + // index a document + id = randomFrom(ids); + ParsedDocument doc = testParsedDocument("test#" + id, id, "test", null, -1, -1, testDocumentWithTextField(), SOURCE, null); + final Engine.Index index = new Engine.Index(newUid("test#" + id), doc, + SequenceNumbersService.UNASSIGNED_SEQ_NO, + rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, + PRIMARY, 0, -1, false); + try { + initialEngine.index(index); + indexedIds.add(id); + } catch (VersionConflictEngineException e) { + versionConflict = true; + } + } + if (versionConflict == false) { primarySeqNo++; - } catch (VersionConflictEngineException e) { - } replicaLocalCheckpoint = @@ -1653,6 +1701,7 @@ public void testSeqNoAndCheckpoints() throws IOException { if (rarely()) { localCheckpoint = primarySeqNo; + maxSeqNo = primarySeqNo; globalCheckpoint = replicaLocalCheckpoint; initialEngine.seqNoService().updateGlobalCheckpointOnPrimary(); initialEngine.flush(true, true); @@ -1661,6 +1710,7 @@ public void testSeqNoAndCheckpoints() throws IOException { initialEngine.seqNoService().updateGlobalCheckpointOnPrimary(); + assertEquals(primarySeqNo, initialEngine.seqNoService().getMaxSeqNo()); assertThat(initialEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo)); assertThat(initialEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo)); assertThat(initialEngine.seqNoService().stats().getGlobalCheckpoint(), equalTo(replicaLocalCheckpoint)); @@ -1671,6 +1721,9 @@ public void testSeqNoAndCheckpoints() throws IOException { assertThat( Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(globalCheckpoint)); + assertThat( + Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)), + equalTo(maxSeqNo)); } finally { IOUtils.close(initialEngine); @@ -1681,12 +1734,19 @@ public void testSeqNoAndCheckpoints() throws IOException { recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); recoveringEngine.recoverFromTranslog(); + assertEquals(primarySeqNo, recoveringEngine.seqNoService().getMaxSeqNo()); assertThat( Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo)); assertThat( Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.GLOBAL_CHECKPOINT_KEY)), equalTo(globalCheckpoint)); + assertThat( + Long.parseLong(recoveringEngine.commitStats().getUserData().get(InternalEngine.MAX_SEQ_NO)), + // after recovering from translog, all docs have been flushed to Lucene segments, so here we will assert + // that the committed max seq no is equivalent to what the current primary seq no is, as all data + // we have assigned sequence numbers to should be in the commit + equalTo(primarySeqNo)); assertThat(recoveringEngine.seqNoService().stats().getLocalCheckpoint(), equalTo(primarySeqNo)); assertThat(recoveringEngine.seqNoService().stats().getMaxSeqNo(), equalTo(primarySeqNo)); assertThat(recoveringEngine.seqNoService().generateSeqNo(), equalTo(primarySeqNo + 1)); @@ -1695,6 +1755,118 @@ public void testSeqNoAndCheckpoints() throws IOException { } } + // this test writes documents to the engine while concurrently flushing/commit + // and ensuring that the commit points contain the correct sequence number data + public void testConcurrentWritesAndCommits() throws Exception { + try (final Store store = createStore(); + final InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), + new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE), + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + + final int numIndexingThreads = scaledRandomIntBetween(3, 6); + final int numDocsPerThread = randomIntBetween(500, 1000); + final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1); + final List indexingThreads = new ArrayList<>(); + // create N indexing threads to index documents simultaneously + for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) { + final int threadIdx = threadNum; + Thread indexingThread = new Thread() { + @Override + public void run() { + try { + barrier.await(); // wait for all threads to start at the same time + // index random number of docs + for (int i = 0; i < numDocsPerThread; i++) { + final String id = "thread" + threadIdx + "#" + i; + ParsedDocument doc = testParsedDocument(id, id, "test", null, -1, -1, testDocument(), B_1, null); + engine.index(new Engine.Index(newUid(id), doc)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + indexingThreads.add(indexingThread); + } + + // start the indexing threads + for (Thread thread : indexingThreads) { + thread.start(); + } + barrier.await(); // wait for indexing threads to all be ready to start + + // create random commit points + boolean doneIndexing; + do { + doneIndexing = indexingThreads.stream().filter(Thread::isAlive).count() == 0; + //engine.flush(); // flush and commit + } while (doneIndexing == false); + + // now, verify all the commits have the correct docs according to the user commit data + long prevLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; + long prevMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + for (IndexCommit commit : DirectoryReader.listCommits(store.directory())) { + Map userData = commit.getUserData(); + long localCheckpoint = userData.containsKey(InternalEngine.LOCAL_CHECKPOINT_KEY) ? + Long.parseLong(userData.get(InternalEngine.LOCAL_CHECKPOINT_KEY)) : + SequenceNumbersService.NO_OPS_PERFORMED; + long maxSeqNo = userData.containsKey(InternalEngine.MAX_SEQ_NO) ? + Long.parseLong(userData.get(InternalEngine.MAX_SEQ_NO)) : + SequenceNumbersService.UNASSIGNED_SEQ_NO; + // local checkpoint and max seq no shouldn't go backwards + assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint)); + assertThat(maxSeqNo, greaterThanOrEqualTo(prevMaxSeqNo)); + try (IndexReader reader = DirectoryReader.open(commit)) { + FieldStats stats = SeqNoFieldMapper.Defaults.FIELD_TYPE.stats(reader); + final long highestSeqNo; + if (stats != null) { + highestSeqNo = (long) stats.getMaxValue(); + } else { + highestSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + } + // make sure localCheckpoint <= highest seq no found <= maxSeqNo + assertThat(highestSeqNo, greaterThanOrEqualTo(localCheckpoint)); + assertThat(highestSeqNo, lessThanOrEqualTo(maxSeqNo)); + // make sure all sequence numbers up to and including the local checkpoint are in the index + FixedBitSet seqNosBitSet = getSeqNosSet(reader, highestSeqNo); + for (int i = 0; i <= localCheckpoint; i++) { + assertTrue("local checkpoint [" + localCheckpoint + "], _seq_no [" + i + "] should be indexed", + seqNosBitSet.get(i)); + } + } + prevLocalCheckpoint = localCheckpoint; + prevMaxSeqNo = maxSeqNo; + } + } + } + + private static FixedBitSet getSeqNosSet(final IndexReader reader, final long highestSeqNo) throws IOException { + // _seq_no are stored as doc values for the time being, so this is how we get them + // (as opposed to using an IndexSearcher or IndexReader) + final FixedBitSet bitSet = new FixedBitSet((int) highestSeqNo + 1); + final List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return bitSet; + } + + for (int i = 0; i < leaves.size(); i++) { + final LeafReader leaf = leaves.get(i).reader(); + final NumericDocValues values = leaf.getNumericDocValues(SeqNoFieldMapper.NAME); + if (values == null) { + continue; + } + final Bits bits = leaf.getLiveDocs(); + for (int docID = 0; docID < leaf.maxDoc(); docID++) { + if (bits == null || bits.get(docID)) { + final long seqNo = values.get(docID); + assertFalse("should not have more than one document with the same seq_no[" + seqNo + "]", bitSet.get((int) seqNo)); + bitSet.set((int) seqNo); + } + } + } + return bitSet; + } + // #8603: make sure we can separately log IFD's messages public void testIndexWriterIFDInfoStream() throws IllegalAccessException { assumeFalse("who tests the tester?", VERBOSE);