Skip to content

Commit

Permalink
Lock down Engine.Searcher (#34363)
Browse files Browse the repository at this point in the history
`Engine.Searcher` is non-final today which makes it error prone
in the case of wrapping the underlying reader or lucene `IndexSearcher`
like we do in `IndexSearcherWrapper`. Yet, there is no subclass of it yet
that would be dramatic to just drop on the floor. With the start of development
of frozen indices this changed since in #34357 functionality was added to
a subclass which would be dropped if a `IndexSearcherWrapper` is installed on an index.
This change locks down the `Engine.Searcher` to prevent such a functionality trap.
  • Loading branch information
s1monw authored and kcm committed Oct 30, 2018
1 parent 6f3e32e commit 0ab8317
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 180 deletions.
63 changes: 27 additions & 36 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -84,6 +83,7 @@
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Base64;
Expand Down Expand Up @@ -665,14 +665,23 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
Releasable releasable = store::decRef;
try {
ReferenceManager<IndexSearcher> referenceManager = getReferenceManager(scope);
Searcher engineSearcher = new Searcher(source, referenceManager.acquire(),
s -> {
try {
referenceManager.release(s);
} finally {
store.decRef();
}
}, logger);
IndexSearcher acquire = referenceManager.acquire();
AtomicBoolean released = new AtomicBoolean(false);
Searcher engineSearcher = new Searcher(source, acquire,
() -> {
if (released.compareAndSet(false, true)) {
try {
referenceManager.release(acquire);
} finally {
store.decRef();
}
} else {
/* In general, searchers should never be released twice or this would break reference counting. There is one rare case
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short
* amount of time, this is why we only log a warning instead of throwing an exception. */
logger.warn("Searcher was released twice", new IllegalStateException("Double release"));
}
});
releasable = null; // success - hand over the reference to the engine searcher
return engineSearcher;
} catch (AlreadyClosedException ex) {
Expand Down Expand Up @@ -1175,69 +1184,51 @@ default void onFailedEngine(String reason, @Nullable Exception e) {
}
}

public static class Searcher implements Releasable {
public static final class Searcher implements Releasable {
private final String source;
private final IndexSearcher searcher;
private final AtomicBoolean released = new AtomicBoolean(false);
private final Logger logger;
private final IOUtils.IOConsumer<IndexSearcher> onClose;
private final Closeable onClose;

public Searcher(String source, IndexSearcher searcher, Logger logger) {
this(source, searcher, s -> s.getIndexReader().close(), logger);
}

public Searcher(String source, IndexSearcher searcher, IOUtils.IOConsumer<IndexSearcher> onClose, Logger logger) {
public Searcher(String source, IndexSearcher searcher, Closeable onClose) {
this.source = source;
this.searcher = searcher;
this.onClose = onClose;
this.logger = logger;
}

/**
* The source that caused this searcher to be acquired.
*/
public final String source() {
public String source() {
return source;
}

public final IndexReader reader() {
public IndexReader reader() {
return searcher.getIndexReader();
}

public final DirectoryReader getDirectoryReader() {
public DirectoryReader getDirectoryReader() {
if (reader() instanceof DirectoryReader) {
return (DirectoryReader) reader();
}
throw new IllegalStateException("Can't use " + reader().getClass() + " as a directory reader");
}

public final IndexSearcher searcher() {
public IndexSearcher searcher() {
return searcher;
}

@Override
public void close() {
if (released.compareAndSet(false, true) == false) {
/* In general, searchers should never be released twice or this would break reference counting. There is one rare case
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short amount
* of time, this is why we only log a warning instead of throwing an exception.
*/
logger.warn("Searcher was released twice", new IllegalStateException("Double release"));
return;
}
try {
onClose.accept(searcher());
onClose.close();
} catch (IOException e) {
throw new IllegalStateException("Cannot close", e);
throw new UncheckedIOException("failed to close", e);
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
}
}

public final Logger getLogger() {
return logger;
}
}

public abstract static class Operation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
// in the case of a already pruned translog generation we might get null here - yet very unlikely
TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig
.getIndexSettings().getIndexVersionCreated());
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), logger),
return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), reader::close),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0));
}
} catch (IOException e) {
Expand Down Expand Up @@ -2086,7 +2086,7 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader)
if (warmer != null) {
try {
assert searcher.getIndexReader() instanceof ElasticsearchDirectoryReader : "this class needs an ElasticsearchDirectoryReader but got: " + searcher.getIndexReader().getClass();
warmer.warm(new Searcher("top_reader_warming", searcher, s -> {}, logger));
warmer.warm(new Searcher("top_reader_warming", searcher, () -> {}));
} catch (Exception e) {
if (isEngineClosed.get() == false) {
logger.warn("failed to prepare/warm", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ public final Engine.Searcher wrap(Engine.Searcher engineSearcher) throws IOExcep
} else {
// we close the reader to make sure wrappers can release resources if needed....
// our NonClosingReaderWrapper makes sure that our reader is not closed
return new Engine.Searcher(engineSearcher.source(), indexSearcher, s -> IOUtils.close(s.getIndexReader(), engineSearcher),
engineSearcher.getLogger());
return new Engine.Searcher(engineSearcher.source(), indexSearcher, () ->
IOUtils.close(indexSearcher.getIndexReader(), // this will close the wrappers excluding the NonClosingReaderWrapper
engineSearcher)); // this will run the closeable on the wrapped engine searcher
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class IndexSearcherWrapperTests extends ESTestCase {
Expand Down Expand Up @@ -73,20 +74,20 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
final int sourceRefCount = open.getRefCount();
final AtomicInteger count = new AtomicInteger();
final AtomicInteger outerCount = new AtomicInteger();
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) {
final Engine.Searcher wrap = wrapper.wrap(engineSearcher);
assertEquals(1, wrap.reader().getRefCount());
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
if (key == open.getReaderCacheHelper().getKey()) {
count.incrementAndGet();
}
outerCount.incrementAndGet();
});
assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits.value);
wrap.close();
assertFalse("wrapped reader is closed", wrap.reader().tryIncRef());
assertEquals(sourceRefCount, open.getRefCount());
}
final AtomicBoolean closeCalled = new AtomicBoolean(false);
final Engine.Searcher wrap = wrapper.wrap(new Engine.Searcher("foo", searcher, () -> closeCalled.set(true)));
assertEquals(1, wrap.reader().getRefCount());
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
if (key == open.getReaderCacheHelper().getKey()) {
count.incrementAndGet();
}
outerCount.incrementAndGet();
});
assertEquals(0, wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1).totalHits.value);
wrap.close();
assertFalse("wrapped reader is closed", wrap.reader().tryIncRef());
assertEquals(sourceRefCount, open.getRefCount());
assertTrue(closeCalled.get());
assertEquals(1, closeCalls.get());

IOUtils.close(open, writer, dir);
Expand Down Expand Up @@ -121,15 +122,15 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
}
};
final ConcurrentHashMap<Object, TopDocs> cache = new ConcurrentHashMap<>();
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, s -> {}, logger)) {
try (Engine.Searcher wrap = wrapper.wrap(engineSearcher)) {
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
cache.remove(key);
});
TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1);
cache.put(wrap.reader().getReaderCacheHelper().getKey(), search);
}
AtomicBoolean closeCalled = new AtomicBoolean(false);
try (Engine.Searcher wrap = wrapper.wrap(new Engine.Searcher("foo", searcher, () -> closeCalled.set(true)))) {
ElasticsearchDirectoryReader.addReaderCloseListener(wrap.getDirectoryReader(), key -> {
cache.remove(key);
});
TopDocs search = wrap.searcher().search(new TermQuery(new Term("field", "doc")), 1);
cache.put(wrap.reader().getReaderCacheHelper().getKey(), search);
}
assertTrue(closeCalled.get());
assertEquals(1, closeCalls.get());

assertEquals(1, cache.size());
Expand All @@ -151,11 +152,11 @@ public void testNoWrap() throws IOException {
assertEquals(1, searcher.search(new TermQuery(new Term("field", "doc")), 1).totalHits.value);
searcher.setSimilarity(iwc.getSimilarity());
IndexSearcherWrapper wrapper = new IndexSearcherWrapper();
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, logger)) {
try (Engine.Searcher engineSearcher = new Engine.Searcher("foo", searcher, open::close)) {
final Engine.Searcher wrap = wrapper.wrap(engineSearcher);
assertSame(wrap, engineSearcher);
}
IOUtils.close(open, writer, dir);
IOUtils.close(writer, dir);
}

private static class FieldMaskingReader extends FilterDirectoryReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testPreProcess() throws Exception {
try (Directory dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
IndexReader reader = w.getReader();
Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), logger)) {
Engine.Searcher searcher = new Engine.Searcher("test", new IndexSearcher(reader), reader::close)) {

DefaultSearchContext context1 = new DefaultSearchContext(1L, shardSearchRequest, null, searcher, null, indexService,
indexShard, bigArrays, null, timeout, null, null, Version.CURRENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void testUseIndexStats() throws IOException {

public void testApproximations() throws IOException {
QueryProfiler profiler = new QueryProfiler();
Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), logger);
Engine.Searcher engineSearcher = new Engine.Searcher("test", new IndexSearcher(reader), reader::close);
// disable query caching since we want to test approximations, which won't
// be exposed on a cached entry
ContextIndexSearcher searcher = new ContextIndexSearcher(engineSearcher, null, MAYBE_CACHE_POLICY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,15 @@ protected <A extends Aggregator> A createAggregator(Query query,
}

protected SearchContext createSearchContext(IndexSearcher indexSearcher, IndexSettings indexSettings) {
Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher, logger);
Engine.Searcher searcher = new Engine.Searcher("aggregator_test", indexSearcher, () -> indexSearcher.getIndexReader().close());
QueryCache queryCache = new DisabledQueryCache(indexSettings);
QueryCachingPolicy queryCachingPolicy = new QueryCachingPolicy() {
@Override
public void onUse(Query query) {
}

@Override
public boolean shouldCache(Query query) throws IOException {
public boolean shouldCache(Query query) {
// never cache a query
return false;
}
Expand Down

This file was deleted.

Loading

0 comments on commit 0ab8317

Please sign in to comment.