diff --git a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java index 493d809f9dc33..46d19d2a814fe 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java +++ b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java @@ -277,5 +277,4 @@ public static void fsync(final Path fileToSync, final boolean isDir) throws IOEx throw ioe; } } - } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index b384e45f56d22..5be6bed0d577f 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -29,8 +29,12 @@ import org.apache.lucene.document.LatLonDocValuesField; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.Fields; import org.apache.lucene.index.FilterCodecReader; import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.FilterLeafReader; @@ -40,13 +44,20 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafMetaData; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Terms; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.FieldDoc; @@ -209,7 +220,7 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc throw new IllegalStateException("no commit found in the directory"); } } - final CommitPoint cp = new CommitPoint(si, directory); + final IndexCommit cp = getIndexCommit(si, directory); try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER) .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) .setIndexCommit(cp) @@ -221,6 +232,13 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc return si; } + /** + * Returns an index commit for the given {@link SegmentInfos} in the given directory. + */ + public static IndexCommit getIndexCommit(SegmentInfos si, Directory directory) throws IOException { + return new CommitPoint(si, directory); + } + /** * This method removes all lucene files from the given directory. It will first try to delete all commit points / segments * files to ensure broken commits or corrupted indices will not be opened in the future. If any of the segment files can't be deleted @@ -973,6 +991,88 @@ public static NumericDocValuesField newSoftDeletesField() { return new NumericDocValuesField(SOFT_DELETES_FIELD, 1); } + /** + * Returns an empty leaf reader with the given max docs. The reader will be fully deleted. + */ + public static LeafReader emptyReader(final int maxDoc) { + return new LeafReader() { + final Bits liveDocs = new Bits.MatchNoBits(maxDoc); + + public Terms terms(String field) { + return null; + } + + public NumericDocValues getNumericDocValues(String field) { + return null; + } + + public BinaryDocValues getBinaryDocValues(String field) { + return null; + } + + public SortedDocValues getSortedDocValues(String field) { + return null; + } + + public SortedNumericDocValues getSortedNumericDocValues(String field) { + return null; + } + + public SortedSetDocValues getSortedSetDocValues(String field) { + return null; + } + + public NumericDocValues getNormValues(String field) { + return null; + } + + public FieldInfos getFieldInfos() { + return new FieldInfos(new FieldInfo[0]); + } + + public Bits getLiveDocs() { + return this.liveDocs; + } + + public PointValues getPointValues(String fieldName) { + return null; + } + + public void checkIntegrity() { + } + + public Fields getTermVectors(int docID) { + return null; + } + + public int numDocs() { + return 0; + } + + public int maxDoc() { + return maxDoc; + } + + public void document(int docID, StoredFieldVisitor visitor) { + } + + protected void doClose() { + } + + public LeafMetaData getMetaData() { + return new LeafMetaData(Version.LATEST.major, Version.LATEST, (Sort)null); + } + + public CacheHelper getCoreCacheHelper() { + return null; + } + + public CacheHelper getReaderCacheHelper() { + return null; + } + }; + } + /** * Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive) * in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found. diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 26808330986da..89545af641c28 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -793,7 +793,7 @@ public final CommitStats commitStats() { /** * Global stats on segments. */ - public final SegmentsStats segmentsStats(boolean includeSegmentFileSizes) { + public SegmentsStats segmentsStats(boolean includeSegmentFileSizes) { ensureOpen(); Set segmentName = new HashSet<>(); SegmentsStats stats = new SegmentsStats(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java b/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java index 7972d426fba02..9630485cbc105 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java +++ b/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java @@ -48,6 +48,11 @@ final class RamAccountingSearcherFactory extends SearcherFactory { @Override public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException { + processReaders(reader, previousReader); + return super.newSearcher(reader, previousReader); + } + + public void processReaders(IndexReader reader, IndexReader previousReader) { final CircuitBreaker breaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING); // Construct a list of the previous segment readers, we only want to track memory used @@ -79,6 +84,5 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) segmentReader.getCoreCacheHelper().addClosedListener(k -> breaker.addWithoutBreaking(-ramBytesUsed)); } } - return super.newSearcher(reader, previousReader); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 26ef259a1e1c6..fc4b0632c8076 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -57,7 +57,7 @@ * * @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function) */ -public final class ReadOnlyEngine extends Engine { +public class ReadOnlyEngine extends Engine { private final SegmentInfos lastCommittedSegmentInfos; private final SeqNoStats seqNoStats; @@ -66,6 +66,7 @@ public final class ReadOnlyEngine extends Engine { private final IndexCommit indexCommit; private final Lock indexWriterLock; private final DocsStats docsStats; + protected final RamAccountingSearcherFactory searcherFactory; /** * Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened @@ -82,6 +83,7 @@ public final class ReadOnlyEngine extends Engine { public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock, Function readerWrapperFunction) { super(config); + this.searcherFactory = new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()); try { Store store = config.getStore(); store.incRef(); @@ -96,14 +98,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats; this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats; - reader = ElasticsearchDirectoryReader.wrap(open(directory), config.getShardId()); - if (config.getIndexSettings().isSoftDeleteEnabled()) { - reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); - } - reader = readerWrapperFunction.apply(reader); - this.indexCommit = reader.getIndexCommit(); - this.searcherManager = new SearcherManager(reader, - new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); + this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); + reader = open(indexCommit); + reader = wrapReader(reader, readerWrapperFunction); + searcherManager = new SearcherManager(reader, searcherFactory); this.docsStats = docsStats(lastCommittedSegmentInfos); this.indexWriterLock = indexWriterLock; success = true; @@ -117,8 +115,17 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats } } - protected DirectoryReader open(final Directory directory) throws IOException { - return DirectoryReader.open(directory); + protected final DirectoryReader wrapReader(DirectoryReader reader, + Function readerWrapperFunction) throws IOException { + reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId()); + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); + } + return readerWrapperFunction.apply(reader); + } + + protected DirectoryReader open(IndexCommit commit) throws IOException { + return DirectoryReader.open(commit); } private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) { diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index d681a186892db..0b17ec72fbb17 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -154,7 +154,7 @@ final class DefaultSearchContext extends SearchContext { private final Map searchExtBuilders = new HashMap<>(); private final Map, Collector> queryCollectors = new HashMap<>(); private final QueryShardContext queryShardContext; - private FetchPhase fetchPhase; + private final FetchPhase fetchPhase; DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService, @@ -186,7 +186,7 @@ final class DefaultSearchContext extends SearchContext { @Override public void doClose() { - // clear and scope phase we have + // clear and scope phase we have Releasables.close(searcher, engineSearcher); } diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index 04a4629e9a875..8e7c2ef013a43 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -217,4 +217,8 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio public DirectoryReader getDirectoryReader() { return engineSearcher.getDirectoryReader(); } + + public Engine.Searcher getEngineSearcher() { + return engineSearcher; + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 379043fa93954..a0dd8191b6ed6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5057,15 +5057,15 @@ public void testLuceneSnapshotRefreshesOnlyOnce() throws Exception { null, new ReferenceManager.RefreshListener() { @Override - public void beforeRefresh() throws IOException { + public void beforeRefresh() { refreshCounter.incrementAndGet(); } @Override - public void afterRefresh(boolean didRefresh) throws IOException { + public void afterRefresh(boolean didRefresh) { } - }, null, () -> SequenceNumbers.NO_OPS_PERFORMED))) { + }, null, () -> SequenceNumbers.NO_OPS_PERFORMED, new NoneCircuitBreakerService()))) { for (long seqNo = 0; seqNo <= maxSeqNo; seqNo++) { final ParsedDocument doc = testParsedDocument("id_" + seqNo, null, testDocumentWithTextField("test"), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index bbe4dd268e5ef..8158830e96fb4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -88,6 +88,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; @@ -573,13 +574,14 @@ public EngineConfig config(IndexSettings indexSettings, Store store, Path transl public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) { - return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier); + return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier, + new NoneCircuitBreakerService()); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener externalRefreshListener, ReferenceManager.RefreshListener internalRefreshListener, - Sort indexSort, LongSupplier globalCheckpointSupplier) { + Sort indexSort, LongSupplier globalCheckpointSupplier, CircuitBreakerService breakerService) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); Engine.EventListener listener = new Engine.EventListener() { @@ -596,7 +598,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort, - new NoneCircuitBreakerService(), + breakerService, globalCheckpointSupplier == null ? new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) : globalCheckpointSupplier, primaryTerm::get, tombstoneDocSupplier()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java new file mode 100644 index 0000000000000..0cd67e5ebc505 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -0,0 +1,518 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.FilterDirectoryReader; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LeafMetaData; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Terms; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.Bits; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.function.Function; + +/** + * This is a stand-alone read-only engine that maintains a lazy loaded index reader that is opened on calls to + * {@link Engine#acquireSearcher(String)}. The index reader opened is maintained until there are no reference to it anymore and then + * releases itself from the engine. The readers returned from this engine are lazy which allows release after and reset before a search + * phase starts. This allows releasing references as soon as possible on the search layer. + * + * Internally this class uses a set of wrapper abstractions to allow a reader that is used inside the {@link Engine.Searcher} returned from + * {@link #acquireSearcher(String, SearcherScope)} to release and reset it's internal resources. This is necessary to for instance release + * all SegmentReaders after a search phase finishes and reopen them before the next search phase starts. This together with a throttled + * threadpool (search_throttled) guarantees that at most N frozen shards have a low level index reader open at the same time. + * + * In particular we have LazyDirectoryReader that wraps its LeafReaders (the actual segment readers) inside LazyLeafReaders. Each of the + * LazyLeafReader delegates to segment LeafReader that can be reset (it's reference decremented and nulled out) on a search phase is + * finished. Before the next search phase starts we can reopen the corresponding reader and reset the reference to execute the search phase. + * This allows the SearchContext to hold on to the same LazyDirectoryReader across its lifecycle but under the hood resources (memory) is + * released while the SearchContext phases are not executing. + * + * The internal reopen of readers is treated like a refresh and refresh listeners are called up-on reopen. This allows to consume refresh + * stats in order to obtain the number of reopens. + */ +public final class FrozenEngine extends ReadOnlyEngine { + private volatile DirectoryReader lastOpenedReader; + + public FrozenEngine(EngineConfig config) { + super(config, null, null, true, Function.identity()); + } + + @Override + protected DirectoryReader open(IndexCommit indexCommit) throws IOException { + // we fake an empty DirectoryReader for the ReadOnlyEngine. this reader is only used + // to initialize the reference manager and to make the refresh call happy which is essentially + // a no-op now + return new DirectoryReader(indexCommit.getDirectory(), new LeafReader[0]) { + @Override + protected DirectoryReader doOpenIfChanged() { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexCommit commit) { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) { + return null; + } + + @Override + public long getVersion() { + return 0; + } + + @Override + public boolean isCurrent() { + return true; // always current + } + + @Override + public IndexCommit getIndexCommit() { + return indexCommit; // TODO maybe we can return an empty commit? + } + + @Override + protected void doClose() { + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } + }; + } + + @SuppressForbidden(reason = "we manage references explicitly here") + private synchronized void onReaderClosed(IndexReader.CacheKey key) { + // it might look awkward that we have to check here if the keys match but if we concurrently + // access the lastOpenedReader there might be 2 threads competing for the cached reference in + // a way that thread 1 counts down the lastOpenedReader reference and before thread 1 can execute + // the close listener we already open and assign a new reader to lastOpenedReader. In this case + // the cache key doesn't match and we just ignore it since we use this method only to null out the + // lastOpenedReader member to ensure resources can be GCed + if (lastOpenedReader != null && key == lastOpenedReader.getReaderCacheHelper().getKey()) { + assert lastOpenedReader.getRefCount() == 0; + lastOpenedReader = null; + } + } + + private synchronized DirectoryReader getOrOpenReader() throws IOException { + DirectoryReader reader = null; + boolean success = false; + try { + reader = getReader(); + if (reader == null) { + for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) { + listeners.beforeRefresh(); + } + reader = DirectoryReader.open(engineConfig.getStore().directory()); + searcherFactory.processReaders(reader, null); + reader = lastOpenedReader = wrapReader(reader, Function.identity()); + reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed); + for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) { + listeners.afterRefresh(true); + } + } + success = true; + return reader; + } finally { + if (success == false) { + IOUtils.close(reader); + } + } + } + + @SuppressForbidden(reason = "we manage references explicitly here") + private synchronized DirectoryReader getReader() throws IOException { + DirectoryReader reader = null; + boolean success = false; + try { + if (lastOpenedReader != null && lastOpenedReader.tryIncRef()) { + reader = lastOpenedReader; + } + success = true; + return reader; + } finally { + if (success == false) { + IOUtils.close(reader); + } + } + } + + @Override + @SuppressWarnings("fallthrough") + @SuppressForbidden( reason = "we manage references explicitly here") + public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { + store.incRef(); + boolean releaseRefeference = true; + try { + final boolean maybeOpenReader; + switch (source) { + case "load_seq_no": + case "load_version": + assert false : "this is a read-only engine"; + case "doc_stats": + assert false : "doc_stats are overwritten"; + case "segments": + case "segments_stats": + case "completion_stats": + case "refresh_needed": + maybeOpenReader = false; + break; + default: + maybeOpenReader = true; + } + // special case we only want to report segment stats if we have a reader open. in that case we only get a reader if we still + // have one open at the time and can inc it's reference. + DirectoryReader reader = maybeOpenReader ? getOrOpenReader() : getReader(); + if (reader == null) { + // we just hand out a searcher on top of an empty reader that we opened for the ReadOnlyEngine in the #open(IndexCommit) + // method. this is the case when we don't have a reader open right now and we get a stats call any other that falls in + // the category that doesn't trigger a reopen + return super.acquireSearcher(source, scope); + } else { + try { + LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader, this); + Searcher newSearcher = new Searcher(source, new IndexSearcher(lazyDirectoryReader), + () -> IOUtils.close(lazyDirectoryReader, store::decRef)); + releaseRefeference = false; + return newSearcher; + } finally { + if (releaseRefeference) { + reader.decRef(); // don't call close here we manage reference ourselves + } + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (releaseRefeference) { + store.decRef(); + } + } + } + + static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { + while (reader instanceof FilterDirectoryReader) { + if (reader instanceof LazyDirectoryReader) { + return (LazyDirectoryReader) reader; + } + reader = ((FilterDirectoryReader) reader).getDelegate(); + } + return null; + } + + /** + * This class allows us to use the same high level reader across multiple search phases but replace the underpinnings + * on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases. + * + * This reader and its leaf reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still + * overrides all it's delegate methods. We have tests to ensure we never miss an override but we need to in order to make sure + * the wrapper leaf readers don't register themself as close listeners on the wrapped ones otherwise we fail plugging in new readers + * on the next search phase. + */ + static final class LazyDirectoryReader extends FilterDirectoryReader { + + private final FrozenEngine engine; + private volatile DirectoryReader delegate; // volatile since it might be closed concurrently + + private LazyDirectoryReader(DirectoryReader reader, FrozenEngine engine) throws IOException { + super(reader, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return new LazyLeafReader(reader); + }; + }); + this.delegate = reader; + this.engine = engine; + } + + @SuppressForbidden(reason = "we manage references explicitly here") + synchronized void release() throws IOException { + if (delegate != null) { // we are lenient here it's ok to double close + delegate.decRef(); + delegate = null; + if (tryIncRef()) { // only do this if we are not closed already + // we end up in this case when we are not closed but in an intermediate + // state were we want to release all or the real leaf readers ie. in between search phases + // but still want to keep this Lazy reference open. In oder to let the heavy real leaf + // readers to be GCed we need to null our the references. + try { + for (LeafReaderContext leaf : leaves()) { + LazyLeafReader reader = (LazyLeafReader) leaf.reader(); + reader.in = null; + } + } finally { + decRef(); + } + } + } + } + + void reset() throws IOException { + boolean success = false; + DirectoryReader reader = engine.getOrOpenReader(); + try { + reset(reader); + success = true; + } finally { + if (success == false) { + IOUtils.close(reader); + } + } + } + + private synchronized void reset(DirectoryReader delegate) { + if (this.delegate != null) { + throw new AssertionError("lazy reader is not released"); + } + assert (delegate instanceof LazyDirectoryReader) == false : "must not be a LazyDirectoryReader"; + List leaves = delegate.leaves(); + int ord = 0; + for (LeafReaderContext leaf : leaves()) { + LazyLeafReader reader = (LazyLeafReader) leaf.reader(); + LeafReader newReader = leaves.get(ord++).reader(); + assert reader.in == null; + reader.in = newReader; + assert reader.info.info.equals(Lucene.segmentReader(newReader).getSegmentInfo().info); + } + this.delegate = delegate; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) { + throw new UnsupportedOperationException(); + } + + void ensureOpenOrReset() { + // ensure we fail early and with good exceptions + ensureOpen(); + if (delegate == null) { + throw new AlreadyClosedException("delegate is released"); + } + } + + @Override + public long getVersion() { + ensureOpenOrReset(); + return delegate.getVersion(); + } + + @Override + public boolean isCurrent() throws IOException { + ensureOpenOrReset(); + return delegate.isCurrent(); + } + + @Override + public IndexCommit getIndexCommit() throws IOException { + ensureOpenOrReset(); + return delegate.getIndexCommit(); + } + + @Override + protected void doClose() throws IOException { + release(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + ensureOpenOrReset(); + return delegate.getReaderCacheHelper(); + } + + @Override + public DirectoryReader getDelegate() { + ensureOpenOrReset(); + return delegate; + } + } + + /** + * We basically duplicate a FilterLeafReader here since we don't want the + * incoming reader to register with this reader as a parent reader. This would mean we barf if the incoming + * reader is closed and that is what we actually doing on purpose. + */ + static final class LazyLeafReader extends FilterLeafReader { + + private volatile LeafReader in; + private final SegmentCommitInfo info; + private final int numDocs; + private final int maxDocs; + + private LazyLeafReader(LeafReader in) { + super(Lucene.emptyReader(in.maxDoc())); // empty reader here to make FilterLeafReader happy + this.info = Lucene.segmentReader(in).getSegmentInfo(); + this.in = in; + numDocs = in.numDocs(); + maxDocs = in.maxDoc(); + // don't register in reader as a subreader here. + } + + private void ensureOpenOrReleased() { + ensureOpen(); + if (in == null) { + throw new AlreadyClosedException("leaf is already released"); + } + } + + @Override + public Bits getLiveDocs() { + ensureOpenOrReleased(); + return in.getLiveDocs(); + } + + @Override + public FieldInfos getFieldInfos() { + ensureOpenOrReleased(); + return in.getFieldInfos(); + } + + @Override + public PointValues getPointValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getPointValues(field); + } + + @Override + public Fields getTermVectors(int docID) + throws IOException { + ensureOpenOrReleased(); + return in.getTermVectors(docID); + } + + @Override + public int numDocs() { + return numDocs; + } + + @Override + public int maxDoc() { + return maxDocs; + } + + @Override + public void document(int docID, StoredFieldVisitor visitor) throws IOException { + ensureOpenOrReleased(); + in.document(docID, visitor); + } + + @Override + protected void doClose() throws IOException { + in.close(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + ensureOpenOrReleased(); + return in.getReaderCacheHelper(); + } + + @Override + public CacheHelper getCoreCacheHelper() { + ensureOpenOrReleased(); + return in.getCoreCacheHelper(); + } + + @Override + public Terms terms(String field) throws IOException { + ensureOpenOrReleased(); + return in.terms(field); + } + + @Override + public String toString() { + final StringBuilder buffer = new StringBuilder("LazyLeafReader("); + buffer.append(in); + buffer.append(')'); + return buffer.toString(); + } + + @Override + public NumericDocValues getNumericDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getNumericDocValues(field); + } + + @Override + public BinaryDocValues getBinaryDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getBinaryDocValues(field); + } + + @Override + public SortedDocValues getSortedDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getSortedDocValues(field); + } + + @Override + public SortedNumericDocValues getSortedNumericDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getSortedNumericDocValues(field); + } + + @Override + public SortedSetDocValues getSortedSetDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getSortedSetDocValues(field); + } + + @Override + public NumericDocValues getNormValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getNormValues(field); + } + + @Override + public LeafMetaData getMetaData() { + ensureOpenOrReleased(); + return in.getMetaData(); + } + + @Override + public void checkIntegrity() throws IOException { + ensureOpenOrReleased(); + in.checkIntegrity(); + } + + @Override + public LeafReader getDelegate() { + return in; + } + } + + synchronized boolean isReaderOpen() { + return lastOpenedReader != null; + } // this is mainly for tests +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java new file mode 100644 index 0000000000000..2c16df79122eb --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -0,0 +1,282 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.engine; + +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class FrozenEngineTests extends EngineTestCase { + + public void testAcquireReleaseReset() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new NoneCircuitBreakerService()); + try (InternalEngine engine = createEngine(config)) { + int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); + engine.flushAndClose(); + listener.reset(); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + assertFalse(frozenEngine.isReaderOpen()); + Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); + assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher + .getDirectoryReader()).shardId()); + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + assertEquals(1, listener.afterRefresh.get()); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); + assertFalse(frozenEngine.isReaderOpen()); + assertEquals(1, listener.afterRefresh.get()); + expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), numDocs)); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); + assertEquals(2, listener.afterRefresh.get()); + search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + searcher.close(); + } + } + } + } + + public void testAcquireReleaseResetTwoSearchers() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new NoneCircuitBreakerService()); + try (InternalEngine engine = createEngine(config)) { + int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); + engine.flushAndClose(); + listener.reset(); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + assertFalse(frozenEngine.isReaderOpen()); + Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test"); + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + assertEquals(1, listener.afterRefresh.get()); + FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).release(); + Engine.Searcher searcher2 = frozenEngine.acquireSearcher("test"); + search = searcher2.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + assertTrue(frozenEngine.isReaderOpen()); + assertEquals(2, listener.afterRefresh.get()); + expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), numDocs)); + FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).reset(); + assertEquals(2, listener.afterRefresh.get()); + search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + searcher1.close(); + searcher2.close(); + } + } + } + } + + public void testSegmentStats() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new NoneCircuitBreakerService()); + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, engine); + engine.flushAndClose(); + listener.reset(); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); + SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean()); + assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); + assertEquals(1, listener.afterRefresh.get()); + segmentsStats = frozenEngine.segmentsStats(randomBoolean()); + assertEquals(0, segmentsStats.getCount()); + assertEquals(1, listener.afterRefresh.get()); + assertFalse(frozenEngine.isReaderOpen()); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); + segmentsStats = frozenEngine.segmentsStats(randomBoolean()); + assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); + searcher.close(); + } + } + } + } + + public void testCircuitBreakerAccounting() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), + new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); + long expectedUse; + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, engine); + engine.refresh("test"); // pull the reader + expectedUse = breaker.getUsed(); + engine.flushAndClose(); + } + assertTrue(expectedUse > 0); + assertEquals(0, breaker.getUsed()); + listener.reset(); + try (FrozenEngine frozenEngine = new FrozenEngine(config)) { + Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); + assertEquals(expectedUse, breaker.getUsed()); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); + assertEquals(1, listener.afterRefresh.get()); + assertEquals(0, breaker.getUsed()); + assertFalse(frozenEngine.isReaderOpen()); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); + assertEquals(expectedUse, breaker.getUsed()); + searcher.close(); + assertEquals(0, breaker.getUsed()); + } + } + } + + private int addDocuments(AtomicLong globalCheckpoint, InternalEngine engine) throws IOException { + int numDocs = scaledRandomIntBetween(10, 1000); + int numDocsAdded = 0; + for (int i = 0; i < numDocs; i++) { + numDocsAdded++; + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false)); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(engine.getLocalCheckpoint()); + } + engine.syncTranslog(); + return numDocsAdded; + } + + public void testSearchConcurrently() throws IOException, InterruptedException { + // even though we don't want this to be searched concurrently we better make sure we release all resources etc. + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null, globalCheckpoint::get, + new HierarchyCircuitBreakerService(defaultSettings.getSettings(), + new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); + try (InternalEngine engine = createEngine(config)) { + int numDocsAdded = addDocuments(globalCheckpoint, engine); + engine.flushAndClose(); + int numIters = randomIntBetween(100, 1000); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + int numThreads = randomIntBetween(2, 4); + Thread[] threads = new Thread[numThreads]; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CountDownLatch latch = new CountDownLatch(numThreads); + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread(() -> { + try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) { + barrier.await(); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); + for (int j = 0; j < numIters; j++) { + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), Math.min(10, numDocsAdded)); + assertEquals(search.scoreDocs.length, Math.min(10, numDocsAdded)); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); + } + if (randomBoolean()) { + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); + } + } catch (Exception e) { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + }); + threads[i].start(); + } + latch.await(); + for (Thread t : threads) { + t.join(); + } + assertFalse(frozenEngine.isReaderOpen()); + assertEquals(0, breaker.getUsed()); + } + } + } + } + + private static void checkOverrideMethods(Class clazz) throws NoSuchMethodException, SecurityException { + final Class superClazz = clazz.getSuperclass(); + for (Method m : superClazz.getMethods()) { + final int mods = m.getModifiers(); + if (Modifier.isStatic(mods) || Modifier.isAbstract(mods) || Modifier.isFinal(mods) || m.isSynthetic() + || m.getName().equals("attributes") || m.getName().equals("getStats")) { + continue; + } + // The point of these checks is to ensure that methods from the super class + // are overwritten to make sure we never miss a method from FilterLeafReader / FilterDirectoryReader + final Method subM = clazz.getMethod(m.getName(), m.getParameterTypes()); + if (subM.getDeclaringClass() == superClazz + && m.getDeclaringClass() != Object.class + && m.getDeclaringClass() == subM.getDeclaringClass()) { + fail(clazz + " doesn't override" + m + " although it has been declared by it's superclass"); + } + } + } + + // here we make sure we catch any change to their super classes FilterLeafReader / FilterDirectoryReader + public void testOverrideMethods() throws Exception { + checkOverrideMethods(FrozenEngine.LazyDirectoryReader.class); + checkOverrideMethods(FrozenEngine.LazyLeafReader.class); + } + + private class CountingRefreshListener implements ReferenceManager.RefreshListener { + + final AtomicInteger afterRefresh = new AtomicInteger(0); + private final AtomicInteger beforeRefresh = new AtomicInteger(0); + + @Override + public void beforeRefresh() { + beforeRefresh.incrementAndGet(); + } + + @Override + public void afterRefresh(boolean didRefresh) { + afterRefresh.incrementAndGet(); + assertEquals(beforeRefresh.get(), afterRefresh.get()); + } + + void reset() { + afterRefresh.set(0); + beforeRefresh.set(0); + } + } +}