From 99aa8ad399efe5911453637812f07b821bb9431a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 10 Sep 2018 16:10:31 +0200 Subject: [PATCH 01/15] Add a frozen engine implementation This change adds a `frozen` engine that allows lazily open a directory reader on a read-only shard. The engine wraps general purpose searchers in a LazyDirectoryReader that also allows to release and reset the underlying index readers after any and before secondary search phases. Relates to #34352 F --- .../core/internal/io/IOUtils.java | 12 + .../elasticsearch/common/lucene/Lucene.java | 99 ++++ .../elasticsearch/index/engine/Engine.java | 4 +- .../engine/RamAccountingSearcherFactory.java | 6 +- .../index/engine/ReadOnlyEngine.java | 25 +- .../search/DefaultSearchContext.java | 4 +- .../search/internal/ContextIndexSearcher.java | 4 + .../index/engine/EngineTestCase.java | 11 +- .../index/engine/FrozenEngine.java | 490 ++++++++++++++++++ .../index/engine/FrozenEngineTests.java | 251 +++++++++ 10 files changed, 890 insertions(+), 16 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java diff --git a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java index 493d809f9dc33..8796eff787dd0 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java +++ b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java @@ -278,4 +278,16 @@ public static void fsync(final Path fileToSync, final boolean isDir) throws IOEx } } + /** + * An IO operation with a single input. + * @see java.util.function.Consumer + */ + @FunctionalInterface + public interface IOConsumer { + /** + * Performs this operation on the given argument. + */ + void accept(T input) throws IOException; + } + } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index ea363e884613b..0fe8ae6e95481 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -28,8 +28,12 @@ import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.document.LatLonDocValuesField; import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.Fields; import org.apache.lucene.index.FilterCodecReader; import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.FilterLeafReader; @@ -39,12 +43,20 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafMetaData; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Terms; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Explanation; import org.apache.lucene.search.FieldDoc; @@ -217,6 +229,10 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc return si; } + public static IndexCommit getIndexCommit(SegmentInfos si, Directory directory) throws IOException { + return new CommitPoint(si, directory); + } + /** * This method removes all lucene files from the given directory. It will first try to delete all commit points / segments * files to ensure broken commits or corrupted indices will not be opened in the future. If any of the segment files can't be deleted @@ -967,4 +983,87 @@ public CacheHelper getReaderCacheHelper() { public static NumericDocValuesField newSoftDeletesField() { return new NumericDocValuesField(SOFT_DELETES_FIELD, 1); } + + /** + * Returns an empty leaf reader with the given max docs. The reader will be fully deleted. + */ + public static LeafReader emptyReader(final int maxDoc) { + return new LeafReader() { + final Bits liveDocs = new Bits.MatchNoBits(maxDoc); + + public Terms terms(String field) { + return null; + } + + public NumericDocValues getNumericDocValues(String field) { + return null; + } + + public BinaryDocValues getBinaryDocValues(String field) { + return null; + } + + public SortedDocValues getSortedDocValues(String field) { + return null; + } + + public SortedNumericDocValues getSortedNumericDocValues(String field) { + return null; + } + + public SortedSetDocValues getSortedSetDocValues(String field) { + return null; + } + + public NumericDocValues getNormValues(String field) { + return null; + } + + public FieldInfos getFieldInfos() { + return new FieldInfos(new FieldInfo[0]); + } + + public Bits getLiveDocs() { + return this.liveDocs; + } + + public PointValues getPointValues(String fieldName) { + return null; + } + + public void checkIntegrity() { + } + + public Fields getTermVectors(int docID) { + return null; + } + + public int numDocs() { + return 0; + } + + public int maxDoc() { + return maxDoc; + } + + public void document(int docID, StoredFieldVisitor visitor) { + } + + protected void doClose() { + } + + public LeafMetaData getMetaData() { + return new LeafMetaData(Version.LATEST.major, Version.LATEST, (Sort)null); + } + + public CacheHelper getCoreCacheHelper() { + return null; + } + + public CacheHelper getReaderCacheHelper() { + return null; + } + }; + } + } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 460501c8b5238..18f6ec09344ca 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -43,7 +43,6 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; -import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.index.IndexRequest; @@ -66,6 +65,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; @@ -784,7 +784,7 @@ public final CommitStats commitStats() { /** * Global stats on segments. */ - public final SegmentsStats segmentsStats(boolean includeSegmentFileSizes) { + public SegmentsStats segmentsStats(boolean includeSegmentFileSizes) { ensureOpen(); Set segmentName = new HashSet<>(); SegmentsStats stats = new SegmentsStats(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java b/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java index 7972d426fba02..9630485cbc105 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java +++ b/server/src/main/java/org/elasticsearch/index/engine/RamAccountingSearcherFactory.java @@ -48,6 +48,11 @@ final class RamAccountingSearcherFactory extends SearcherFactory { @Override public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException { + processReaders(reader, previousReader); + return super.newSearcher(reader, previousReader); + } + + public void processReaders(IndexReader reader, IndexReader previousReader) { final CircuitBreaker breaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING); // Construct a list of the previous segment readers, we only want to track memory used @@ -79,6 +84,5 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) segmentReader.getCoreCacheHelper().addClosedListener(k -> breaker.addWithoutBreaking(-ramBytesUsed)); } } - return super.newSearcher(reader, previousReader); } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 26ef259a1e1c6..b6443187de902 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -57,7 +57,7 @@ * * @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function) */ -public final class ReadOnlyEngine extends Engine { +public class ReadOnlyEngine extends Engine { private final SegmentInfos lastCommittedSegmentInfos; private final SeqNoStats seqNoStats; @@ -66,6 +66,7 @@ public final class ReadOnlyEngine extends Engine { private final IndexCommit indexCommit; private final Lock indexWriterLock; private final DocsStats docsStats; + protected final RamAccountingSearcherFactory searcherFactory; /** * Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened @@ -82,6 +83,7 @@ public final class ReadOnlyEngine extends Engine { public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock, Function readerWrapperFunction) { super(config); + this.searcherFactory = new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()); try { Store store = config.getStore(); store.incRef(); @@ -96,14 +98,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats; this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats; - reader = ElasticsearchDirectoryReader.wrap(open(directory), config.getShardId()); - if (config.getIndexSettings().isSoftDeleteEnabled()) { - reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); - } - reader = readerWrapperFunction.apply(reader); - this.indexCommit = reader.getIndexCommit(); - this.searcherManager = new SearcherManager(reader, - new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); + reader = open(directory); + reader = wrapReader(reader, readerWrapperFunction); + searcherManager = new SearcherManager(reader, searcherFactory); + this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); this.docsStats = docsStats(lastCommittedSegmentInfos); this.indexWriterLock = indexWriterLock; success = true; @@ -117,6 +115,15 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats } } + protected final DirectoryReader wrapReader(DirectoryReader reader, + Function readerWrapperFunction) throws IOException { + reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId()); + if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { + reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); + } + return readerWrapperFunction.apply(reader); + } + protected DirectoryReader open(final Directory directory) throws IOException { return DirectoryReader.open(directory); } diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index d681a186892db..0b17ec72fbb17 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -154,7 +154,7 @@ final class DefaultSearchContext extends SearchContext { private final Map searchExtBuilders = new HashMap<>(); private final Map, Collector> queryCollectors = new HashMap<>(); private final QueryShardContext queryShardContext; - private FetchPhase fetchPhase; + private final FetchPhase fetchPhase; DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget, Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService, @@ -186,7 +186,7 @@ final class DefaultSearchContext extends SearchContext { @Override public void doClose() { - // clear and scope phase we have + // clear and scope phase we have Releasables.close(searcher, engineSearcher); } diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index 04a4629e9a875..8e7c2ef013a43 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -217,4 +217,8 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio public DirectoryReader getDirectoryReader() { return engineSearcher.getDirectoryReader(); } + + public Engine.Searcher getEngineSearcher() { + return engineSearcher; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 12f0d645d8a87..b78506fa8c938 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -81,6 +81,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; @@ -545,6 +546,13 @@ public EngineConfig config(IndexSettings indexSettings, Store store, Path transl public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) { + return config(indexSettings, store, translogPath, mergePolicy, refreshListener, indexSort, globalCheckpointSupplier, + new NoneCircuitBreakerService()); + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier, + CircuitBreakerService breakerService) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); Engine.EventListener listener = new Engine.EventListener() { @@ -559,8 +567,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, - new NoneCircuitBreakerService(), - globalCheckpointSupplier == null ? + breakerService, globalCheckpointSupplier == null ? new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) : globalCheckpointSupplier, primaryTerm::get, tombstoneDocSupplier()); return config; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java new file mode 100644 index 0000000000000..7572061485931 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -0,0 +1,490 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.engine; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.FilterDirectoryReader; +import org.apache.lucene.index.FilterLeafReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LeafMetaData; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SegmentCommitInfo; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Terms; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Bits; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.function.Function; + +/** + * This is a stand-alone read-only engine that maintains a lazy loaded index reader that is opened on calls to + * {@link Engine#acquireSearcher(String)}. The index reader opened is maintained until there are no reference to it anymore and then + * releases itself from the engine. The readers returned from this engine are lazy which allows release after and reset before a search + * phase starts. This allows releasing references as soon as possible on the search layer. + */ +public final class FrozenEngine extends ReadOnlyEngine { + private final CounterMetric openedReaders = new CounterMetric(); + private volatile DirectoryReader lastOpenedReader; + + public FrozenEngine(EngineConfig config) { + super(config, null, null, true, Function.identity()); + } + + @Override + protected DirectoryReader open(Directory directory) throws IOException { + // we fake an empty directly reader for the ReadOnlyEngine. this reader is only used + // to initialize the reference manager and to make the refresh call happy which is essentially + // a no-op now + IndexCommit indexCommit = Lucene.getIndexCommit(getLastCommittedSegmentInfos(), directory); + return new DirectoryReader(directory, new LeafReader[0]) { + @Override + protected DirectoryReader doOpenIfChanged() { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexCommit commit) { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) { + return null; + } + + @Override + public long getVersion() { + return 0; + } + + @Override + public boolean isCurrent() { + return true; // always current + } + + @Override + public IndexCommit getIndexCommit() { + return indexCommit; // TODO maybe we can return an empty commit? + } + + @Override + protected void doClose() { + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } + }; + } + + @SuppressForbidden(reason = "we manage references explicitly here") + private synchronized void onReaderClosed(IndexReader.CacheKey key) { + if (lastOpenedReader != null && key == lastOpenedReader.getReaderCacheHelper().getKey()) { + assert lastOpenedReader.getRefCount() == 0; + lastOpenedReader = null; + } + } + + @SuppressForbidden(reason = "we manage references explicitly here") + private synchronized DirectoryReader getOrOpenReader(boolean doOpen) throws IOException { + DirectoryReader reader = null; + boolean success = false; + try { + if (lastOpenedReader == null || lastOpenedReader.tryIncRef() == false) { + if (doOpen) { + reader = DirectoryReader.open(engineConfig.getStore().directory()); + searcherFactory.processReaders(reader, null); + openedReaders.inc(); + reader = lastOpenedReader = wrapReader(reader, Function.identity()); + reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed); + } + } else { + reader = lastOpenedReader; + } + success = true; + return reader; + } finally { + if (success == false) { + IOUtils.close(reader); + } + } + } + + @Override + @SuppressWarnings("fallthrough") + @SuppressForbidden( reason = "we manage references explicitly here") + public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { + store.incRef(); + boolean success = false; + try { + final boolean openReader; + switch (source) { + case "load_seq_no": + case "load_version": + assert false : "this is a read-only engine"; + case "doc_stats": + assert false : "doc_stats are overwritten"; + case "segments": + case "segments_stats": + case "completion_stats": + case "refresh_needed": + openReader = false; + break; + default: + openReader = true; + } + // special case we only want to report segment stats if we have a reader open. in that case we only get a reader if we still + // have one open at the time and can inc it's reference. + DirectoryReader reader = getOrOpenReader(openReader); + if (reader == null) { + store.decRef(); + success = true; + // we just hand out an empty searcher in this case + return super.acquireSearcher(source, scope); + } else { + try { + LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader); + FrozenEngineSearcher newSearcher = new FrozenEngineSearcher(source, lazyDirectoryReader, + new IndexSearcher(lazyDirectoryReader), + s -> { + try { + s.getIndexReader().close(); + } finally { + store.decRef(); + } + }, logger); + success = true; + return newSearcher; + } finally { + if (success == false) { + reader.decRef(); // don't call close here we manage reference ourselves + } + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } finally { + if (success == false) { + store.decRef(); + } + } + } + + /** + * A Searcher impl that makes it straight forward to release readers after a search phase + */ + final class FrozenEngineSearcher extends Searcher { + + private final LazyDirectoryReader lazyDirectoryReader; + + FrozenEngineSearcher(String source, LazyDirectoryReader lazyDirectoryReader, IndexSearcher searcher, + IOUtils.IOConsumer + onClose, Logger logger) { + super(source, searcher, onClose, logger); + this.lazyDirectoryReader = lazyDirectoryReader; + } + + void releaseReader() throws IOException { + lazyDirectoryReader.release(); + } + + void resetReader() throws IOException { + lazyDirectoryReader.reset(getOrOpenReader(true)); + } + } + + /** + * This class allows us to use the same high level reader across multiple search phases but replace the underpinnings + * on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases. + * + * This reader and it's leave reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still + * overrides all it's delegate methods. We have tests to ensure we never miss an override but we need to in order to make sure + * the wrapper leaf readers don't register themself as close listeners on the wrapped ones otherwise we fail plugging in new readers + * on the next search phase. + */ + static final class LazyDirectoryReader extends FilterDirectoryReader { + + private volatile DirectoryReader delegate; // volatile since it might be closed concurrently + + private LazyDirectoryReader(DirectoryReader reader) throws IOException { + super(reader, new SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader reader) { + return new LazyLeafReader(reader); + }; + }); + this.delegate = reader; + } + + @SuppressForbidden(reason = "we manage references explicitly here") + synchronized void release() throws IOException { + if (delegate != null) { // we are lenient here it's ok to double close + delegate.decRef(); + delegate = null; + if (tryIncRef()) { // only do this if we are not closed already + try { + for (LeafReaderContext leaf : leaves()) { + LazyLeafReader reader = (LazyLeafReader) leaf.reader(); + reader.in = null; + } + } finally { + decRef(); + } + } + } + } + + synchronized void reset(DirectoryReader delegate) { + if (this.delegate != null) { + throw new IllegalStateException("lazy reader is not released"); + } + assert (delegate instanceof LazyDirectoryReader) == false : "must not be a LazyDirectoryReader"; + List leaves = delegate.leaves(); + int ord = 0; + for (LeafReaderContext leaf : leaves()) { + LazyLeafReader reader = (LazyLeafReader) leaf.reader(); + LeafReader newReader = leaves.get(ord++).reader(); + assert reader.in == null; + reader.in = newReader; + assert reader.info.info.equals(Lucene.segmentReader(newReader).getSegmentInfo().info); + } + this.delegate = delegate; + } + + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) { + throw new UnsupportedOperationException(); + } + + void ensureOpenOrReset() { + // ensure we fail early and with good exceptions + ensureOpen(); + if (delegate == null) { + throw new AlreadyClosedException("delegate is released"); + } + } + + @Override + public long getVersion() { + ensureOpenOrReset(); + return delegate.getVersion(); + } + + @Override + public boolean isCurrent() throws IOException { + ensureOpenOrReset(); + return delegate.isCurrent(); + } + + @Override + public IndexCommit getIndexCommit() throws IOException { + ensureOpenOrReset(); + return delegate.getIndexCommit(); + } + + @Override + protected void doClose() throws IOException { + release(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + ensureOpenOrReset(); + return delegate.getReaderCacheHelper(); + } + + @Override + public DirectoryReader getDelegate() { + ensureOpenOrReset(); + return delegate; + } + } + + /** + * We basically duplicate a FilterLeafReader here since we don't want the + * incoming reader to register with this reader as a parent reader. This would mean we barf if the incoming + * reader is closed and that is what we actually doing on purpose. + */ + static final class LazyLeafReader extends FilterLeafReader { + + private volatile LeafReader in; + private final SegmentCommitInfo info; + private final int numDocs; + private final int maxDocs; + + private LazyLeafReader(LeafReader in) { + super(Lucene.emptyReader(in.maxDoc())); // empty reader here to make FilterLeafReader happy + this.info = Lucene.segmentReader(in).getSegmentInfo(); + this.in = in; + numDocs = in.numDocs(); + maxDocs = in.maxDoc(); + // don't register in reader as a subreader here. + } + + private void ensureOpenOrReleased() { + ensureOpen(); + if (in == null) { + throw new AlreadyClosedException("leaf is already released"); + } + } + + @Override + public Bits getLiveDocs() { + ensureOpenOrReleased(); + return in.getLiveDocs(); + } + + @Override + public FieldInfos getFieldInfos() { + ensureOpenOrReleased(); + return in.getFieldInfos(); + } + + @Override + public PointValues getPointValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getPointValues(field); + } + + @Override + public Fields getTermVectors(int docID) + throws IOException { + ensureOpenOrReleased(); + return in.getTermVectors(docID); + } + + @Override + public int numDocs() { + return numDocs; + } + + @Override + public int maxDoc() { + return maxDocs; + } + + @Override + public void document(int docID, StoredFieldVisitor visitor) throws IOException { + ensureOpenOrReleased(); + in.document(docID, visitor); + } + + @Override + protected void doClose() throws IOException { + in.close(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + ensureOpenOrReleased(); + return in.getReaderCacheHelper(); + } + + @Override + public CacheHelper getCoreCacheHelper() { + ensureOpenOrReleased(); + return in.getCoreCacheHelper(); + } + + @Override + public Terms terms(String field) throws IOException { + ensureOpenOrReleased(); + return in.terms(field); + } + + @Override + public String toString() { + final StringBuilder buffer = new StringBuilder("LazyLeafReader("); + buffer.append(in); + buffer.append(')'); + return buffer.toString(); + } + + @Override + public NumericDocValues getNumericDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getNumericDocValues(field); + } + + @Override + public BinaryDocValues getBinaryDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getBinaryDocValues(field); + } + + @Override + public SortedDocValues getSortedDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getSortedDocValues(field); + } + + @Override + public SortedNumericDocValues getSortedNumericDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getSortedNumericDocValues(field); + } + + @Override + public SortedSetDocValues getSortedSetDocValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getSortedSetDocValues(field); + } + + @Override + public NumericDocValues getNormValues(String field) throws IOException { + ensureOpenOrReleased(); + return in.getNormValues(field); + } + + @Override + public LeafMetaData getMetaData() { + ensureOpenOrReleased(); + return in.getMetaData(); + } + + @Override + public void checkIntegrity() throws IOException { + ensureOpenOrReleased(); + in.checkIntegrity(); + } + + @Override + public LeafReader getDelegate() { + return in; + } + } + + // TODO expose this as stats on master + long getOpenedReaders() { + return openedReaders.count(); + } + + synchronized boolean isReaderOpen() { + return lastOpenedReader != null; + } // this is mainly for tests +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java new file mode 100644 index 0000000000000..6237677ed2fd7 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -0,0 +1,251 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.engine; + +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicLong; + +public class FrozenEngineTests extends EngineTestCase { + + public void testAcquireReleaseReset() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 1000); + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, numDocs, engine); + engine.flushAndClose(); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + assertFalse(frozenEngine.isReaderOpen()); + FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher + .getDirectoryReader()).shardId()); + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + assertEquals(1, frozenEngine.getOpenedReaders()); + searcher.releaseReader(); + assertFalse(frozenEngine.isReaderOpen()); + assertEquals(1, frozenEngine.getOpenedReaders()); + expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), 10)); + searcher.resetReader(); + assertEquals(2, frozenEngine.getOpenedReaders()); + search = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + searcher.close(); + } + } + } + } + + public void testAcquireReleaseResetTwoSearchers() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 1000); + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, numDocs, engine); + engine.flushAndClose(); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + assertFalse(frozenEngine.isReaderOpen()); + FrozenEngine.FrozenEngineSearcher searcher1 = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + assertEquals(1, frozenEngine.getOpenedReaders()); + searcher1.releaseReader(); + FrozenEngine.FrozenEngineSearcher searcher2 = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + search = searcher2.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + assertTrue(frozenEngine.isReaderOpen()); + assertEquals(2, frozenEngine.getOpenedReaders()); + expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), 10)); + searcher1.resetReader(); + assertEquals(2, frozenEngine.getOpenedReaders()); + search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + searcher1.close(); + searcher2.close(); + } + } + } + } + + public void testSegmentStats() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 1000); + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, numDocs, engine); + engine.flushAndClose(); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean()); + assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); + searcher.releaseReader(); + assertEquals(1, frozenEngine.getOpenedReaders()); + segmentsStats = frozenEngine.segmentsStats(randomBoolean()); + assertEquals(0, segmentsStats.getCount()); + assertEquals(1, frozenEngine.getOpenedReaders()); + assertFalse(frozenEngine.isReaderOpen()); + searcher.resetReader(); + segmentsStats = frozenEngine.segmentsStats(randomBoolean()); + assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); + searcher.close(); + } + } + } + } + + public void testCircuitBreakerAccounting() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, + new HierarchyCircuitBreakerService(defaultSettings.getSettings(), + new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + int numDocs = scaledRandomIntBetween(10, 1000); + CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); + long expectedUse; + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, numDocs, engine); + engine.refresh("test"); // pull the reader + expectedUse = breaker.getUsed(); + engine.flushAndClose(); + } + assertTrue(expectedUse > 0); + assertEquals(0, breaker.getUsed()); + try (FrozenEngine frozenEngine = new FrozenEngine(config)) { + FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher)frozenEngine.acquireSearcher("test"); + assertEquals(expectedUse, breaker.getUsed()); + searcher.releaseReader(); + assertEquals(1, frozenEngine.getOpenedReaders()); + assertEquals(0, breaker.getUsed()); + assertFalse(frozenEngine.isReaderOpen()); + searcher.resetReader(); + assertEquals(expectedUse, breaker.getUsed()); + searcher.close(); + assertEquals(0, breaker.getUsed()); + } + } + } + + private void addDocuments(AtomicLong globalCheckpoint, int numDocs, InternalEngine engine) throws IOException { + for (int i = 0; i < numDocs; i++) { + if (rarely()) { + continue; // gap in sequence number + } + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false)); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + } + engine.syncTranslog(); + } + + public void testSearchConcurrently() throws IOException, InterruptedException { + // even though we don't want this to be searched concurrently we better make sure we release all resources etc. + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, + new HierarchyCircuitBreakerService(defaultSettings.getSettings(), + new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); + int numDocs = scaledRandomIntBetween(10, 1000); + CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); + try (InternalEngine engine = createEngine(config)) { + addDocuments(globalCheckpoint, numDocs, engine); + engine.flushAndClose(); + int numIters = randomIntBetween(100, 1000); + try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { + int numThreads = randomIntBetween(2, 4); + Thread[] threads = new Thread[numThreads]; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CountDownLatch latch = new CountDownLatch(numThreads); + for (int i = 0; i < numThreads; i++) { + threads[i] = new Thread(() -> { + try (FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine + .acquireSearcher("test")) { + barrier.await(); + searcher.releaseReader(); + for (int j = 0; j < numIters; j++) { + searcher.resetReader(); + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); + assertEquals(search.scoreDocs.length, 10); + searcher.releaseReader(); + } + if (randomBoolean()) { + searcher.resetReader(); + } + } catch (Exception e) { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + }); + threads[i].start(); + } + latch.await(); + for (Thread t : threads) { + t.join(); + } + assertFalse(frozenEngine.isReaderOpen()); + assertEquals(0, breaker.getUsed()); + } + } + } + } + + private static void checkOverrideMethods(Class clazz) throws NoSuchMethodException, SecurityException { + final Class superClazz = clazz.getSuperclass(); + for (Method m : superClazz.getMethods()) { + final int mods = m.getModifiers(); + if (Modifier.isStatic(mods) || Modifier.isAbstract(mods) || Modifier.isFinal(mods) || m.isSynthetic() + || m.getName().equals("attributes") || m.getName().equals("getStats")) { + continue; + } + // The point of these checks is to ensure that methods from the super class + // are overwritten to make sure we never miss a method from FilterLeafReader / FilterDirectoryReader + final Method subM = clazz.getMethod(m.getName(), m.getParameterTypes()); + if (subM.getDeclaringClass() == superClazz + && m.getDeclaringClass() != Object.class + && m.getDeclaringClass() == subM.getDeclaringClass()) { + fail(clazz + " doesn't override" + m + " although it has been declared by it's superclass"); + } + } + } + + // here we make sure we catch any change to their super classes FilterLeafReader / FilterDirectoryReader + public void testOverrideMethods() throws Exception { + checkOverrideMethods(FrozenEngine.LazyDirectoryReader.class); + checkOverrideMethods(FrozenEngine.LazyLeafReader.class); + } +} From 900917c8edf07fe8d3e4b684a5392dd0a844569e Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 16 Oct 2018 16:37:46 +0200 Subject: [PATCH 02/15] remove frozen index searcher since Engine.Searcher is now final --- .../index/engine/FrozenEngine.java | 46 +++++++------------ .../index/engine/FrozenEngineTests.java | 37 ++++++++------- 2 files changed, 35 insertions(+), 48 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 7572061485931..0f3698c965af2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -169,15 +169,8 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin } else { try { LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader); - FrozenEngineSearcher newSearcher = new FrozenEngineSearcher(source, lazyDirectoryReader, - new IndexSearcher(lazyDirectoryReader), - s -> { - try { - s.getIndexReader().close(); - } finally { - store.decRef(); - } - }, logger); + Searcher newSearcher = new Searcher(source, new IndexSearcher(lazyDirectoryReader), + () -> IOUtils.close(lazyDirectoryReader, store::decRef)); success = true; return newSearcher; } finally { @@ -195,27 +188,22 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin } } - /** - * A Searcher impl that makes it straight forward to release readers after a search phase - */ - final class FrozenEngineSearcher extends Searcher { - - private final LazyDirectoryReader lazyDirectoryReader; - - FrozenEngineSearcher(String source, LazyDirectoryReader lazyDirectoryReader, IndexSearcher searcher, - IOUtils.IOConsumer - onClose, Logger logger) { - super(source, searcher, onClose, logger); - this.lazyDirectoryReader = lazyDirectoryReader; - } + void release(LazyDirectoryReader reader) throws IOException { + reader.release(); + } - void releaseReader() throws IOException { - lazyDirectoryReader.release(); - } + void reset(LazyDirectoryReader reader) throws IOException { + reader.reset(getOrOpenReader(true)); + } - void resetReader() throws IOException { - lazyDirectoryReader.reset(getOrOpenReader(true)); + static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { + while (reader instanceof FilterDirectoryReader) { + if (reader instanceof LazyDirectoryReader) { + return (LazyDirectoryReader) reader; + } + reader = ((FilterDirectoryReader) reader).getDelegate(); } + return null; } /** @@ -242,7 +230,7 @@ public LeafReader wrap(LeafReader reader) { } @SuppressForbidden(reason = "we manage references explicitly here") - synchronized void release() throws IOException { + private synchronized void release() throws IOException { if (delegate != null) { // we are lenient here it's ok to double close delegate.decRef(); delegate = null; @@ -259,7 +247,7 @@ synchronized void release() throws IOException { } } - synchronized void reset(DirectoryReader delegate) { + private synchronized void reset(DirectoryReader delegate) { if (this.delegate != null) { throw new IllegalStateException("lazy reader is not released"); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 6237677ed2fd7..32628e3005708 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -38,18 +38,18 @@ public void testAcquireReleaseReset() throws IOException { engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); - FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher .getDirectoryReader()).shardId()); assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertEquals(1, frozenEngine.getOpenedReaders()); - searcher.releaseReader(); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertFalse(frozenEngine.isReaderOpen()); assertEquals(1, frozenEngine.getOpenedReaders()); expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), 10)); - searcher.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertEquals(2, frozenEngine.getOpenedReaders()); search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); @@ -70,19 +70,19 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException { engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); - FrozenEngine.FrozenEngineSearcher searcher1 = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test"); assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertEquals(1, frozenEngine.getOpenedReaders()); - searcher1.releaseReader(); - FrozenEngine.FrozenEngineSearcher searcher2 = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader())); + Engine.Searcher searcher2 = frozenEngine.acquireSearcher("test"); search = searcher2.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertTrue(frozenEngine.isReaderOpen()); assertEquals(2, frozenEngine.getOpenedReaders()); expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), 10)); - searcher1.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader())); assertEquals(2, frozenEngine.getOpenedReaders()); search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); @@ -103,16 +103,16 @@ public void testSegmentStats() throws IOException { addDocuments(globalCheckpoint, numDocs, engine); engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { - FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine.acquireSearcher("test"); + Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); - searcher.releaseReader(); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertEquals(1, frozenEngine.getOpenedReaders()); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(0, segmentsStats.getCount()); assertEquals(1, frozenEngine.getOpenedReaders()); assertFalse(frozenEngine.isReaderOpen()); - searcher.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); searcher.close(); @@ -140,13 +140,13 @@ public void testCircuitBreakerAccounting() throws IOException { assertTrue(expectedUse > 0); assertEquals(0, breaker.getUsed()); try (FrozenEngine frozenEngine = new FrozenEngine(config)) { - FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher)frozenEngine.acquireSearcher("test"); + Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(expectedUse, breaker.getUsed()); - searcher.releaseReader(); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertEquals(1, frozenEngine.getOpenedReaders()); assertEquals(0, breaker.getUsed()); assertFalse(frozenEngine.isReaderOpen()); - searcher.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertEquals(expectedUse, breaker.getUsed()); searcher.close(); assertEquals(0, breaker.getUsed()); @@ -191,19 +191,18 @@ public void testSearchConcurrently() throws IOException, InterruptedException { CountDownLatch latch = new CountDownLatch(numThreads); for (int i = 0; i < numThreads; i++) { threads[i] = new Thread(() -> { - try (FrozenEngine.FrozenEngineSearcher searcher = (FrozenEngine.FrozenEngineSearcher) frozenEngine - .acquireSearcher("test")) { + try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) { barrier.await(); - searcher.releaseReader(); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); for (int j = 0; j < numIters; j++) { - searcher.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); - searcher.releaseReader(); + frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); } if (randomBoolean()) { - searcher.resetReader(); + frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); } } catch (Exception e) { throw new AssertionError(e); From 8324b72a7bb49c62feb3e8275f969b802a162928 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 16 Oct 2018 16:49:08 +0200 Subject: [PATCH 03/15] simplify reset/release --- .../index/engine/FrozenEngine.java | 21 +++++++--------- .../index/engine/FrozenEngineTests.java | 24 +++++++++---------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 0f3698c965af2..d04c5d6ae9c46 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.index.engine; -import org.apache.logging.log4j.Logger; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FieldInfos; @@ -168,7 +167,7 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin return super.acquireSearcher(source, scope); } else { try { - LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader); + LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader, this); Searcher newSearcher = new Searcher(source, new IndexSearcher(lazyDirectoryReader), () -> IOUtils.close(lazyDirectoryReader, store::decRef)); success = true; @@ -188,14 +187,6 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin } } - void release(LazyDirectoryReader reader) throws IOException { - reader.release(); - } - - void reset(LazyDirectoryReader reader) throws IOException { - reader.reset(getOrOpenReader(true)); - } - static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { while (reader instanceof FilterDirectoryReader) { if (reader instanceof LazyDirectoryReader) { @@ -217,9 +208,10 @@ static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { */ static final class LazyDirectoryReader extends FilterDirectoryReader { + private final FrozenEngine engine; private volatile DirectoryReader delegate; // volatile since it might be closed concurrently - private LazyDirectoryReader(DirectoryReader reader) throws IOException { + private LazyDirectoryReader(DirectoryReader reader, FrozenEngine engine) throws IOException { super(reader, new SubReaderWrapper() { @Override public LeafReader wrap(LeafReader reader) { @@ -227,10 +219,11 @@ public LeafReader wrap(LeafReader reader) { }; }); this.delegate = reader; + this.engine = engine; } @SuppressForbidden(reason = "we manage references explicitly here") - private synchronized void release() throws IOException { + synchronized void release() throws IOException { if (delegate != null) { // we are lenient here it's ok to double close delegate.decRef(); delegate = null; @@ -247,6 +240,10 @@ private synchronized void release() throws IOException { } } + void reset() throws IOException { + reset(engine.getOrOpenReader(true)); + } + private synchronized void reset(DirectoryReader delegate) { if (this.delegate != null) { throw new IllegalStateException("lazy reader is not released"); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 32628e3005708..94ba1d83344dc 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -45,11 +45,11 @@ public void testAcquireReleaseReset() throws IOException { TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertEquals(1, frozenEngine.getOpenedReaders()); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); assertFalse(frozenEngine.isReaderOpen()); assertEquals(1, frozenEngine.getOpenedReaders()); expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), 10)); - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); assertEquals(2, frozenEngine.getOpenedReaders()); search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); @@ -75,14 +75,14 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException { TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertEquals(1, frozenEngine.getOpenedReaders()); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).release(); Engine.Searcher searcher2 = frozenEngine.acquireSearcher("test"); search = searcher2.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); assertTrue(frozenEngine.isReaderOpen()); assertEquals(2, frozenEngine.getOpenedReaders()); expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), 10)); - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).reset(); assertEquals(2, frozenEngine.getOpenedReaders()); search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); @@ -106,13 +106,13 @@ public void testSegmentStats() throws IOException { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); assertEquals(1, frozenEngine.getOpenedReaders()); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(0, segmentsStats.getCount()); assertEquals(1, frozenEngine.getOpenedReaders()); assertFalse(frozenEngine.isReaderOpen()); - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); searcher.close(); @@ -142,11 +142,11 @@ public void testCircuitBreakerAccounting() throws IOException { try (FrozenEngine frozenEngine = new FrozenEngine(config)) { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(expectedUse, breaker.getUsed()); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); assertEquals(1, frozenEngine.getOpenedReaders()); assertEquals(0, breaker.getUsed()); assertFalse(frozenEngine.isReaderOpen()); - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); assertEquals(expectedUse, breaker.getUsed()); searcher.close(); assertEquals(0, breaker.getUsed()); @@ -193,16 +193,16 @@ public void testSearchConcurrently() throws IOException, InterruptedException { threads[i] = new Thread(() -> { try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) { barrier.await(); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); for (int j = 0; j < numIters; j++) { - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(search.scoreDocs.length, 10); - frozenEngine.release(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); } if (randomBoolean()) { - frozenEngine.reset(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); + FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); } } catch (Exception e) { throw new AssertionError(e); From a35e79f635771fa49f0a4bac7251db5ef3822ef2 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 16 Oct 2018 20:31:38 +0200 Subject: [PATCH 04/15] fix imports --- server/src/main/java/org/elasticsearch/index/engine/Engine.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index e2056dbd5b297..89545af641c28 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -65,7 +65,6 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; From 5981a206d6f8b61ca55915d2945e23ccaa9c3114 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 17 Oct 2018 14:21:08 +0200 Subject: [PATCH 05/15] apply feedback --- .../core/internal/io/IOUtils.java | 13 ---- .../index/engine/FrozenEngine.java | 4 +- .../index/engine/FrozenEngineTests.java | 67 +++++++++---------- 3 files changed, 35 insertions(+), 49 deletions(-) diff --git a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java index 8796eff787dd0..46d19d2a814fe 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java +++ b/libs/core/src/main/java/org/elasticsearch/core/internal/io/IOUtils.java @@ -277,17 +277,4 @@ public static void fsync(final Path fileToSync, final boolean isDir) throws IOEx throw ioe; } } - - /** - * An IO operation with a single input. - * @see java.util.function.Consumer - */ - @FunctionalInterface - public interface IOConsumer { - /** - * Performs this operation on the given argument. - */ - void accept(T input) throws IOException; - } - } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index d04c5d6ae9c46..7714abc3e281a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -201,7 +201,7 @@ static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { * This class allows us to use the same high level reader across multiple search phases but replace the underpinnings * on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases. * - * This reader and it's leave reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still + * This reader and its leave reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still * overrides all it's delegate methods. We have tests to ensure we never miss an override but we need to in order to make sure * the wrapper leaf readers don't register themself as close listeners on the wrapped ones otherwise we fail plugging in new readers * on the next search phase. @@ -465,7 +465,7 @@ public LeafReader getDelegate() { } // TODO expose this as stats on master - long getOpenedReaders() { + long getTotalOpenedReaders() { return openedReaders.count(); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 94ba1d83344dc..f65109ec450d8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -32,9 +32,8 @@ public void testAcquireReleaseReset() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); - int numDocs = scaledRandomIntBetween(10, 1000); try (InternalEngine engine = createEngine(config)) { - addDocuments(globalCheckpoint, numDocs, engine); + int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); @@ -42,17 +41,17 @@ public void testAcquireReleaseReset() throws IOException { assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher .getDirectoryReader()).shardId()); assertTrue(frozenEngine.isReaderOpen()); - TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); - assertEquals(1, frozenEngine.getOpenedReaders()); + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); assertFalse(frozenEngine.isReaderOpen()); - assertEquals(1, frozenEngine.getOpenedReaders()); - expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), 10)); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); + expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), numDocs)); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); - assertEquals(2, frozenEngine.getOpenedReaders()); - search = searcher.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); + assertEquals(2, frozenEngine.getTotalOpenedReaders()); + search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); searcher.close(); } } @@ -64,28 +63,27 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); - int numDocs = scaledRandomIntBetween(10, 1000); try (InternalEngine engine = createEngine(config)) { - addDocuments(globalCheckpoint, numDocs, engine); + int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test"); assertTrue(frozenEngine.isReaderOpen()); - TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); - assertEquals(1, frozenEngine.getOpenedReaders()); + TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).release(); Engine.Searcher searcher2 = frozenEngine.acquireSearcher("test"); - search = searcher2.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); + search = searcher2.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); assertTrue(frozenEngine.isReaderOpen()); - assertEquals(2, frozenEngine.getOpenedReaders()); - expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), 10)); + assertEquals(2, frozenEngine.getTotalOpenedReaders()); + expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), numDocs)); FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).reset(); - assertEquals(2, frozenEngine.getOpenedReaders()); - search = searcher1.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); + assertEquals(2, frozenEngine.getTotalOpenedReaders()); + search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); searcher1.close(); searcher2.close(); } @@ -98,19 +96,18 @@ public void testSegmentStats() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); - int numDocs = scaledRandomIntBetween(10, 1000); try (InternalEngine engine = createEngine(config)) { - addDocuments(globalCheckpoint, numDocs, engine); + addDocuments(globalCheckpoint, engine); engine.flushAndClose(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); - assertEquals(1, frozenEngine.getOpenedReaders()); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(0, segmentsStats.getCount()); - assertEquals(1, frozenEngine.getOpenedReaders()); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); assertFalse(frozenEngine.isReaderOpen()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); @@ -128,11 +125,10 @@ public void testCircuitBreakerAccounting() throws IOException { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); - int numDocs = scaledRandomIntBetween(10, 1000); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); long expectedUse; try (InternalEngine engine = createEngine(config)) { - addDocuments(globalCheckpoint, numDocs, engine); + addDocuments(globalCheckpoint, engine); engine.refresh("test"); // pull the reader expectedUse = breaker.getUsed(); engine.flushAndClose(); @@ -143,7 +139,7 @@ public void testCircuitBreakerAccounting() throws IOException { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(expectedUse, breaker.getUsed()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); - assertEquals(1, frozenEngine.getOpenedReaders()); + assertEquals(1, frozenEngine.getTotalOpenedReaders()); assertEquals(0, breaker.getUsed()); assertFalse(frozenEngine.isReaderOpen()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); @@ -154,11 +150,14 @@ public void testCircuitBreakerAccounting() throws IOException { } } - private void addDocuments(AtomicLong globalCheckpoint, int numDocs, InternalEngine engine) throws IOException { + private int addDocuments(AtomicLong globalCheckpoint, InternalEngine engine) throws IOException { + int numDocs = scaledRandomIntBetween(10, 1000); + int numDocsAdded = 0; for (int i = 0; i < numDocs; i++) { if (rarely()) { continue; // gap in sequence number } + numDocsAdded++; ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, System.nanoTime(), -1, false)); @@ -168,6 +167,7 @@ private void addDocuments(AtomicLong globalCheckpoint, int numDocs, InternalEngi globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); } engine.syncTranslog(); + return numDocsAdded; } public void testSearchConcurrently() throws IOException, InterruptedException { @@ -178,10 +178,9 @@ public void testSearchConcurrently() throws IOException, InterruptedException { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); - int numDocs = scaledRandomIntBetween(10, 1000); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); try (InternalEngine engine = createEngine(config)) { - addDocuments(globalCheckpoint, numDocs, engine); + int numDocsAdded = addDocuments(globalCheckpoint, engine); engine.flushAndClose(); int numIters = randomIntBetween(100, 1000); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { @@ -197,8 +196,8 @@ public void testSearchConcurrently() throws IOException, InterruptedException { for (int j = 0; j < numIters; j++) { FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); assertTrue(frozenEngine.isReaderOpen()); - TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10); - assertEquals(search.scoreDocs.length, 10); + TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), Math.min(10, numDocsAdded)); + assertEquals(search.scoreDocs.length, Math.min(10, numDocsAdded)); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); } if (randomBoolean()) { From af08d6871a84c2bd1308da82f7a2be35e6ca80d8 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 5 Nov 2018 16:48:14 +0100 Subject: [PATCH 06/15] Fix compilation --- .../org/elasticsearch/index/engine/InternalEngineTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 379043fa93954..a0dd8191b6ed6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5057,15 +5057,15 @@ public void testLuceneSnapshotRefreshesOnlyOnce() throws Exception { null, new ReferenceManager.RefreshListener() { @Override - public void beforeRefresh() throws IOException { + public void beforeRefresh() { refreshCounter.incrementAndGet(); } @Override - public void afterRefresh(boolean didRefresh) throws IOException { + public void afterRefresh(boolean didRefresh) { } - }, null, () -> SequenceNumbers.NO_OPS_PERFORMED))) { + }, null, () -> SequenceNumbers.NO_OPS_PERFORMED, new NoneCircuitBreakerService()))) { for (long seqNo = 0; seqNo <= maxSeqNo; seqNo++) { final ParsedDocument doc = testParsedDocument("id_" + seqNo, null, testDocumentWithTextField("test"), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); From 0a74147657b7a0d608485c5f56e3484bdefd772f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 6 Nov 2018 20:44:18 +0100 Subject: [PATCH 07/15] fix compilation --- .../org/elasticsearch/index/engine/FrozenEngineTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index f65109ec450d8..a10fdf27df22d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -122,7 +122,7 @@ public void testCircuitBreakerAccounting() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); @@ -175,7 +175,7 @@ public void testSearchConcurrently() throws IOException, InterruptedException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); From 39412072578f89e4052e38b71010b82ce2a23893 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 6 Nov 2018 21:15:58 +0100 Subject: [PATCH 08/15] apply review comments from @bleskes --- .../index/engine/ReadOnlyEngine.java | 8 +- .../index/engine/FrozenEngine.java | 77 +++++++++++++------ .../index/engine/FrozenEngineTests.java | 3 - 3 files changed, 59 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index b6443187de902..fc4b0632c8076 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -98,10 +98,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats; this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats; - reader = open(directory); + this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); + reader = open(indexCommit); reader = wrapReader(reader, readerWrapperFunction); searcherManager = new SearcherManager(reader, searcherFactory); - this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); this.docsStats = docsStats(lastCommittedSegmentInfos); this.indexWriterLock = indexWriterLock; success = true; @@ -124,8 +124,8 @@ protected final DirectoryReader wrapReader(DirectoryReader reader, return readerWrapperFunction.apply(reader); } - protected DirectoryReader open(final Directory directory) throws IOException { - return DirectoryReader.open(directory); + protected DirectoryReader open(IndexCommit commit) throws IOException { + return DirectoryReader.open(commit); } private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 7714abc3e281a..6bb9d735195bb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -54,12 +54,11 @@ public FrozenEngine(EngineConfig config) { } @Override - protected DirectoryReader open(Directory directory) throws IOException { - // we fake an empty directly reader for the ReadOnlyEngine. this reader is only used + protected DirectoryReader open(IndexCommit indexCommit) throws IOException { + // we fake an empty DirectoryReader for the ReadOnlyEngine. this reader is only used // to initialize the reference manager and to make the refresh call happy which is essentially // a no-op now - IndexCommit indexCommit = Lucene.getIndexCommit(getLastCommittedSegmentInfos(), directory); - return new DirectoryReader(directory, new LeafReader[0]) { + return new DirectoryReader(indexCommit.getDirectory(), new LeafReader[0]) { @Override protected DirectoryReader doOpenIfChanged() { return null; @@ -103,6 +102,12 @@ public CacheHelper getReaderCacheHelper() { @SuppressForbidden(reason = "we manage references explicitly here") private synchronized void onReaderClosed(IndexReader.CacheKey key) { + // it might look awkward that we have to check here if the keys match but if we concurrently + // access the lastOpenedReader there might be 2 threads competing for the cached reference in + // a way that thread 1 counts down the lastOpenedReader reference and before thread 1 can execute + // the close listener we already open and assign a new reader to lastOpenedReader. In this case + // the cache key doesn't match and we just ignore it since we use this method only to null out the + // lastOpenedReader member to ensure resources can be GCed if (lastOpenedReader != null && key == lastOpenedReader.getReaderCacheHelper().getKey()) { assert lastOpenedReader.getRefCount() == 0; lastOpenedReader = null; @@ -110,19 +115,32 @@ private synchronized void onReaderClosed(IndexReader.CacheKey key) { } @SuppressForbidden(reason = "we manage references explicitly here") - private synchronized DirectoryReader getOrOpenReader(boolean doOpen) throws IOException { + private synchronized DirectoryReader getOrOpenReader() throws IOException { DirectoryReader reader = null; boolean success = false; try { - if (lastOpenedReader == null || lastOpenedReader.tryIncRef() == false) { - if (doOpen) { - reader = DirectoryReader.open(engineConfig.getStore().directory()); - searcherFactory.processReaders(reader, null); - openedReaders.inc(); - reader = lastOpenedReader = wrapReader(reader, Function.identity()); - reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed); - } - } else { + reader = getReader(); + if (reader == null) { + reader = DirectoryReader.open(engineConfig.getStore().directory()); + searcherFactory.processReaders(reader, null); + openedReaders.inc(); + reader = lastOpenedReader = wrapReader(reader, Function.identity()); + reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed); + } + success = true; + return reader; + } finally { + if (success == false) { + IOUtils.close(reader); + } + } + } + + private synchronized DirectoryReader getReader() throws IOException { + DirectoryReader reader = null; + boolean success = false; + try { + if (lastOpenedReader != null && lastOpenedReader.tryIncRef()) { reader = lastOpenedReader; } success = true; @@ -141,7 +159,7 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin store.incRef(); boolean success = false; try { - final boolean openReader; + final boolean maybeOpenReader; switch (source) { case "load_seq_no": case "load_version": @@ -152,18 +170,20 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin case "segments_stats": case "completion_stats": case "refresh_needed": - openReader = false; + maybeOpenReader = false; break; default: - openReader = true; + maybeOpenReader = true; } // special case we only want to report segment stats if we have a reader open. in that case we only get a reader if we still // have one open at the time and can inc it's reference. - DirectoryReader reader = getOrOpenReader(openReader); + DirectoryReader reader = maybeOpenReader ? getOrOpenReader() : getReader(); if (reader == null) { store.decRef(); success = true; - // we just hand out an empty searcher in this case + // we just hand out a searcher on top of an empty reader that we opened for the ReadOnlyEngine in the #open(IndexCommit) + // method. this is the case when we don't have a reader open right now and we get a stats call any other that falls in + // the category that doesn't trigger a reopen return super.acquireSearcher(source, scope); } else { try { @@ -201,7 +221,7 @@ static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { * This class allows us to use the same high level reader across multiple search phases but replace the underpinnings * on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases. * - * This reader and its leave reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still + * This reader and its leaf reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still * overrides all it's delegate methods. We have tests to ensure we never miss an override but we need to in order to make sure * the wrapper leaf readers don't register themself as close listeners on the wrapped ones otherwise we fail plugging in new readers * on the next search phase. @@ -228,6 +248,10 @@ synchronized void release() throws IOException { delegate.decRef(); delegate = null; if (tryIncRef()) { // only do this if we are not closed already + // we end up in this case when we are not closed but in an intermediate + // state were we want to release all or the real leaf readers ie. in between search phases + // but still want to keep this Lazy reference open. In oder to let the heavy real leaf + // readers to be GCed we need to null our the references. try { for (LeafReaderContext leaf : leaves()) { LazyLeafReader reader = (LazyLeafReader) leaf.reader(); @@ -241,12 +265,21 @@ synchronized void release() throws IOException { } void reset() throws IOException { - reset(engine.getOrOpenReader(true)); + boolean success = false; + DirectoryReader reader = engine.getOrOpenReader(); + try { + reset(reader); + success = true; + } finally { + if (success == false) { + IOUtils.close(reader); + } + } } private synchronized void reset(DirectoryReader delegate) { if (this.delegate != null) { - throw new IllegalStateException("lazy reader is not released"); + throw new AssertionError("lazy reader is not released"); } assert (delegate instanceof LazyDirectoryReader) == false : "must not be a LazyDirectoryReader"; List leaves = delegate.leaves(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index a10fdf27df22d..92cfc2c9a7a2f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -154,9 +154,6 @@ private int addDocuments(AtomicLong globalCheckpoint, InternalEngine engine) thr int numDocs = scaledRandomIntBetween(10, 1000); int numDocsAdded = 0; for (int i = 0; i < numDocs; i++) { - if (rarely()) { - continue; // gap in sequence number - } numDocsAdded++; ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, From bee497a7c6bfb075c42dd053611e93c5a5f68356 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 6 Nov 2018 21:31:05 +0100 Subject: [PATCH 09/15] add more comments --- .../org/elasticsearch/index/engine/FrozenEngine.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 6bb9d735195bb..55a83ef8f1ee9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -44,6 +44,17 @@ * {@link Engine#acquireSearcher(String)}. The index reader opened is maintained until there are no reference to it anymore and then * releases itself from the engine. The readers returned from this engine are lazy which allows release after and reset before a search * phase starts. This allows releasing references as soon as possible on the search layer. + * + * Internally this class uses a set of wrapper abstractions to allow a reader that is used inside the {@link Engine.Searcher} returned from + * {@link #acquireSearcher(String, SearcherScope)} to release and reset it's internal resources. This is necessary to for instance release + * all SegmentReaders after a search phase finishes and reopen them before the next search phase starts. This together with a throttled + * threadpool (search_throttled) guarantees that at most N frozen shards have a low level index reader open at the same time. + * + * In particular we have LazyDirectoryReader that wraps its LeafReaders (the actual segment readers) inside LazyLeafReaders. Each of the + * LazyLeafReader delegates to segment LeafReader that can be reset (it's reference decremented and nulled out) on a search phase is + * finished. Before the next search phase starts we can reopen the corresponding reader and reset the reference to execute the search phase. + * This allows the SearchContext to hold on to the same LazyDirectoryReader across its lifecycle but under the hood resources (memory) is + * released while the SearchContext phases are not executing. */ public final class FrozenEngine extends ReadOnlyEngine { private final CounterMetric openedReaders = new CounterMetric(); From 2ff209e6a268c36df28dcdb33ce988d5d696eed7 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 6 Nov 2018 21:44:45 +0100 Subject: [PATCH 10/15] add javadocs --- .../main/java/org/elasticsearch/common/lucene/Lucene.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 7a8ccb8adc090..5be6bed0d577f 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -220,7 +220,7 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc throw new IllegalStateException("no commit found in the directory"); } } - final CommitPoint cp = new CommitPoint(si, directory); + final IndexCommit cp = getIndexCommit(si, directory); try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER) .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) .setIndexCommit(cp) @@ -232,6 +232,9 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc return si; } + /** + * Returns an index commit for the given {@link SegmentInfos} in the given directory. + */ public static IndexCommit getIndexCommit(SegmentInfos si, Directory directory) throws IOException { return new CommitPoint(si, directory); } From 6566ce493527413760fae0b59d76c3fdf2cd1461 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 6 Nov 2018 22:05:41 +0100 Subject: [PATCH 11/15] use refresh stats instead to track reopens --- .../index/engine/FrozenEngine.java | 21 +++--- .../index/engine/FrozenEngineTests.java | 64 +++++++++++++++---- 2 files changed, 61 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 55a83ef8f1ee9..37c44ac6c31bd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -26,12 +26,11 @@ import org.apache.lucene.index.StoredFieldVisitor; import org.apache.lucene.index.Terms; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.core.internal.io.IOUtils; import java.io.IOException; @@ -54,10 +53,12 @@ * LazyLeafReader delegates to segment LeafReader that can be reset (it's reference decremented and nulled out) on a search phase is * finished. Before the next search phase starts we can reopen the corresponding reader and reset the reference to execute the search phase. * This allows the SearchContext to hold on to the same LazyDirectoryReader across its lifecycle but under the hood resources (memory) is - * released while the SearchContext phases are not executing. + * released while the SearchContext phases are not executing. + * + * The internal reopen of readers is treated like a refresh and refresh listeners are called up-on reopen. This allows to consume refresh + * stats in order to obtain the number of reopens. */ public final class FrozenEngine extends ReadOnlyEngine { - private final CounterMetric openedReaders = new CounterMetric(); private volatile DirectoryReader lastOpenedReader; public FrozenEngine(EngineConfig config) { @@ -132,11 +133,16 @@ private synchronized DirectoryReader getOrOpenReader() throws IOException { try { reader = getReader(); if (reader == null) { + for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) { + listeners.beforeRefresh(); + } reader = DirectoryReader.open(engineConfig.getStore().directory()); searcherFactory.processReaders(reader, null); - openedReaders.inc(); reader = lastOpenedReader = wrapReader(reader, Function.identity()); reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed); + for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) { + listeners.afterRefresh(true); + } } success = true; return reader; @@ -508,11 +514,6 @@ public LeafReader getDelegate() { } } - // TODO expose this as stats on master - long getTotalOpenedReaders() { - return openedReaders.count(); - } - synchronized boolean isReaderOpen() { return lastOpenedReader != null; } // this is mainly for tests diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 92cfc2c9a7a2f..57b9ce1537442 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.index.engine; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -17,12 +18,14 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class FrozenEngineTests extends EngineTestCase { @@ -31,10 +34,13 @@ public void testAcquireReleaseReset() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new NoneCircuitBreakerService()); try (InternalEngine engine = createEngine(config)) { int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); engine.flushAndClose(); + listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); @@ -43,13 +49,13 @@ public void testAcquireReleaseReset() throws IOException { assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs); assertEquals(search.scoreDocs.length, numDocs); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); assertFalse(frozenEngine.isReaderOpen()); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); expectThrows(AlreadyClosedException.class, () -> searcher.searcher().search(new MatchAllDocsQuery(), numDocs)); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); - assertEquals(2, frozenEngine.getTotalOpenedReaders()); + assertEquals(2, listener.afterRefresh.get()); search = searcher.searcher().search(new MatchAllDocsQuery(), numDocs); assertEquals(search.scoreDocs.length, numDocs); searcher.close(); @@ -62,26 +68,29 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new NoneCircuitBreakerService()); try (InternalEngine engine = createEngine(config)) { int numDocs = Math.min(10, addDocuments(globalCheckpoint, engine)); engine.flushAndClose(); + listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { assertFalse(frozenEngine.isReaderOpen()); Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test"); assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs); assertEquals(search.scoreDocs.length, numDocs); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).release(); Engine.Searcher searcher2 = frozenEngine.acquireSearcher("test"); search = searcher2.searcher().search(new MatchAllDocsQuery(), numDocs); assertEquals(search.scoreDocs.length, numDocs); assertTrue(frozenEngine.isReaderOpen()); - assertEquals(2, frozenEngine.getTotalOpenedReaders()); + assertEquals(2, listener.afterRefresh.get()); expectThrows(AlreadyClosedException.class, () -> searcher1.searcher().search(new MatchAllDocsQuery(), numDocs)); FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).reset(); - assertEquals(2, frozenEngine.getTotalOpenedReaders()); + assertEquals(2, listener.afterRefresh.get()); search = searcher1.searcher().search(new MatchAllDocsQuery(), numDocs); assertEquals(search.scoreDocs.length, numDocs); searcher1.close(); @@ -95,19 +104,22 @@ public void testSegmentStats() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new NoneCircuitBreakerService()); try (InternalEngine engine = createEngine(config)) { addDocuments(globalCheckpoint, engine); engine.flushAndClose(); + listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig)) { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); assertEquals(0, segmentsStats.getCount()); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); assertFalse(frozenEngine.isReaderOpen()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); segmentsStats = frozenEngine.segmentsStats(randomBoolean()); @@ -122,8 +134,9 @@ public void testCircuitBreakerAccounting() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, null, globalCheckpoint::get, - new HierarchyCircuitBreakerService(defaultSettings.getSettings(), + CountingRefreshListener listener = new CountingRefreshListener(); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, listener, null, + globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); long expectedUse; @@ -135,11 +148,12 @@ public void testCircuitBreakerAccounting() throws IOException { } assertTrue(expectedUse > 0); assertEquals(0, breaker.getUsed()); + listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(config)) { Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); assertEquals(expectedUse, breaker.getUsed()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); - assertEquals(1, frozenEngine.getTotalOpenedReaders()); + assertEquals(1, listener.afterRefresh.get()); assertEquals(0, breaker.getUsed()); assertFalse(frozenEngine.isReaderOpen()); FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); @@ -243,4 +257,26 @@ public void testOverrideMethods() throws Exception { checkOverrideMethods(FrozenEngine.LazyDirectoryReader.class); checkOverrideMethods(FrozenEngine.LazyLeafReader.class); } + + private class CountingRefreshListener implements ReferenceManager.RefreshListener { + + final AtomicInteger afterRefresh = new AtomicInteger(0); + private final AtomicInteger beforeRefresh = new AtomicInteger(0); + + @Override + public void beforeRefresh() { + beforeRefresh.incrementAndGet(); + } + + @Override + public void afterRefresh(boolean didRefresh) { + afterRefresh.incrementAndGet(); + assertEquals(beforeRefresh.get(), afterRefresh.get()); + } + + void reset() { + afterRefresh.set(0); + beforeRefresh.set(0); + } + } } From ea8a665ddf27f0a8dfc40fad99fb17a9a209a14a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Nov 2018 08:13:37 +0100 Subject: [PATCH 12/15] fix forbidden API check --- .../main/java/org/elasticsearch/index/engine/FrozenEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 37c44ac6c31bd..1434fb0ec4a85 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -126,7 +126,6 @@ private synchronized void onReaderClosed(IndexReader.CacheKey key) { } } - @SuppressForbidden(reason = "we manage references explicitly here") private synchronized DirectoryReader getOrOpenReader() throws IOException { DirectoryReader reader = null; boolean success = false; @@ -153,6 +152,7 @@ private synchronized DirectoryReader getOrOpenReader() throws IOException { } } + @SuppressForbidden(reason = "we manage references explicitly here") private synchronized DirectoryReader getReader() throws IOException { DirectoryReader reader = null; boolean success = false; From be71799cbd67977b57de79fd3bdbeca967529b72 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Nov 2018 16:32:13 +0100 Subject: [PATCH 13/15] make sure GCP is allways == LCP --- .../java/org/elasticsearch/index/engine/FrozenEngineTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 57b9ce1537442..2c16df79122eb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -175,7 +175,7 @@ private int addDocuments(AtomicLong globalCheckpoint, InternalEngine engine) thr if (rarely()) { engine.flush(); } - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + globalCheckpoint.set(engine.getLocalCheckpoint()); } engine.syncTranslog(); return numDocsAdded; From 80b6827b7827dc1212624ee763225a3a585df0bc Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Nov 2018 16:41:47 +0100 Subject: [PATCH 14/15] make store release more inituitive --- .../org/elasticsearch/index/engine/FrozenEngine.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 1434fb0ec4a85..4f8142a6d8ac7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -196,12 +196,15 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin // have one open at the time and can inc it's reference. DirectoryReader reader = maybeOpenReader ? getOrOpenReader() : getReader(); if (reader == null) { - store.decRef(); - success = true; // we just hand out a searcher on top of an empty reader that we opened for the ReadOnlyEngine in the #open(IndexCommit) // method. this is the case when we don't have a reader open right now and we get a stats call any other that falls in // the category that doesn't trigger a reopen - return super.acquireSearcher(source, scope); + try { + return super.acquireSearcher(source, scope); + } finally { + success = true; + store.decRef(); // this is the reference we acquired in the beginning of this method + } } else { try { LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader, this); From f24740827b22732a5cb097aae15c42241a63925a Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 7 Nov 2018 16:49:51 +0100 Subject: [PATCH 15/15] use a releaseReference pattern instead of success patter --- .../elasticsearch/index/engine/FrozenEngine.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 4f8142a6d8ac7..0cd67e5ebc505 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -174,7 +174,7 @@ private synchronized DirectoryReader getReader() throws IOException { @SuppressForbidden( reason = "we manage references explicitly here") public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { store.incRef(); - boolean success = false; + boolean releaseRefeference = true; try { final boolean maybeOpenReader; switch (source) { @@ -199,21 +199,16 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin // we just hand out a searcher on top of an empty reader that we opened for the ReadOnlyEngine in the #open(IndexCommit) // method. this is the case when we don't have a reader open right now and we get a stats call any other that falls in // the category that doesn't trigger a reopen - try { - return super.acquireSearcher(source, scope); - } finally { - success = true; - store.decRef(); // this is the reference we acquired in the beginning of this method - } + return super.acquireSearcher(source, scope); } else { try { LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader, this); Searcher newSearcher = new Searcher(source, new IndexSearcher(lazyDirectoryReader), () -> IOUtils.close(lazyDirectoryReader, store::decRef)); - success = true; + releaseRefeference = false; return newSearcher; } finally { - if (success == false) { + if (releaseRefeference) { reader.decRef(); // don't call close here we manage reference ourselves } } @@ -221,7 +216,7 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin } catch (IOException e) { throw new UncheckedIOException(e); } finally { - if (success == false) { + if (releaseRefeference) { store.decRef(); } }