Skip to content

Commit

Permalink
Ensure external refreshes will also refresh internal seacher to minim…
Browse files Browse the repository at this point in the history
…ize segment creation

We cut over to internal and external IndexReader/IndexSeacher in elastic#26972 which uses
two independent searcher managers. This has the downside that refreshes of the external
reader will never clear the internal version map which in-turn will trigger additional
and potentially unnecessary segment flushes since memory must be freed. Under heavy
indexing load with low refresh intervals this can cause excessive segment creation which
causes high GC activity and significantly increases the required segment merges.

This change adds a dedicated external reference manager that delegates refreshes to the
internal reference manager that then `steals` the refreshed reader from the internal
reference manager for external usage. This ensures that external and internal readers
are consistent on an external refresh. As a sideeffect this also releases old segments
referenced by the internal reference manager which can potentially hold on to already merged
away segments until it is refreshed due to a flush or indexing activity.
  • Loading branch information
s1monw committed Nov 3, 2017
1 parent b294250 commit eb62da7
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 35 deletions.
22 changes: 14 additions & 8 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
Expand Down Expand Up @@ -170,7 +170,7 @@ protected static boolean isMergedSegment(LeafReader reader) {
return IndexWriter.SOURCE_MERGE.equals(source);
}

protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) {
protected Searcher newSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager) {
return new EngineSearcher(source, searcher, manager, store, logger);
}

Expand Down Expand Up @@ -531,7 +531,7 @@ public final Searcher acquireSearcher(String source, SearcherScope scope) throws
* the searcher is acquired. */
store.incRef();
try {
final SearcherManager manager = getSearcherManager(source, scope); // can never be null
final ReferenceManager<IndexSearcher> manager = getSearcherManager(source, scope); // can never be null
/* This might throw NPE but that's fine we will run ensureOpen()
* in the catch block and throw the right exception */
final IndexSearcher searcher = manager.acquire();
Expand Down Expand Up @@ -585,7 +585,7 @@ public CommitStats commitStats() {
/**
* Read the last segments info from the commit pointed to by the searcher manager
*/
protected static SegmentInfos readLastCommittedSegmentInfos(final SearcherManager sm, final Store store) throws IOException {
protected static SegmentInfos readLastCommittedSegmentInfos(final ReferenceManager<IndexSearcher> sm, final Store store) throws IOException {
IndexSearcher searcher = sm.acquire();
try {
IndexCommit latestCommit = ((DirectoryReader) searcher.getIndexReader()).getIndexCommit();
Expand Down Expand Up @@ -787,13 +787,19 @@ public int compare(Segment o1, Segment o2) {
public final boolean refreshNeeded() {
if (store.tryIncRef()) {
/*
we need to inc the store here since searcherManager.isSearcherCurrent()
acquires a searcher internally and that might keep a file open on the
we need to inc the store here since we acquire a searcher and that might keep a file open on the
store. this violates the assumption that all files are closed when
the store is closed so we need to make sure we increment it here
*/
try {
return getSearcherManager("refresh_needed", SearcherScope.EXTERNAL).isSearcherCurrent() == false;
ReferenceManager<IndexSearcher> manager = getSearcherManager("refresh_needed", SearcherScope.EXTERNAL);
final IndexSearcher searcher = manager.acquire();
try {
final IndexReader r = searcher.getIndexReader();
return ((DirectoryReader) r).isCurrent() == false;
} finally {
manager.release(searcher);
}
} catch (IOException e) {
logger.error("failed to access searcher manager", e);
failEngine("failed to access searcher manager", e);
Expand Down Expand Up @@ -1331,7 +1337,7 @@ public void release() {
}
}

protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope);
protected abstract ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope);

/**
* Method to close the engine while the write lock is held.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.index.store.Store;
Expand All @@ -32,12 +33,12 @@
* Searcher for an Engine
*/
public class EngineSearcher extends Engine.Searcher {
private final SearcherManager manager;
private final ReferenceManager<IndexSearcher> manager;
private final AtomicBoolean released = new AtomicBoolean(false);
private final Store store;
private final Logger logger;

public EngineSearcher(String source, IndexSearcher searcher, SearcherManager manager, Store store, Logger logger) {
public EngineSearcher(String source, IndexSearcher searcher, ReferenceManager<IndexSearcher> manager, Store store, Logger logger) {
super(source, searcher);
this.manager = manager;
this.store = store;
Expand Down
105 changes: 86 additions & 19 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
Expand Down Expand Up @@ -108,7 +107,7 @@ public class InternalEngine extends Engine {

private final IndexWriter indexWriter;

private final SearcherManager externalSearcherManager;
private final ExternalSearcherManager externalSearcherManager;
private final SearcherManager internalSearcherManager;

private final Lock flushLock = new ReentrantLock();
Expand Down Expand Up @@ -172,7 +171,7 @@ public InternalEngine(EngineConfig engineConfig) {
store.incRef();
IndexWriter writer = null;
Translog translog = null;
SearcherManager externalSearcherManager = null;
ExternalSearcherManager externalSearcherManager = null;
SearcherManager internalSearcherManager = null;
EngineMergeScheduler scheduler = null;
boolean success = false;
Expand Down Expand Up @@ -224,8 +223,9 @@ public InternalEngine(EngineConfig engineConfig) {
throw e;
}
}
internalSearcherManager = createSearcherManager(new SearcherFactory(), false);
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed, engineConfig), true);
externalSearcherManager = createSearcherManager(new SearchFactory(logger, isClosed,
engineConfig));
internalSearcherManager = externalSearcherManager.internalSearcherManager;
this.internalSearcherManager = internalSearcherManager;
this.externalSearcherManager = externalSearcherManager;
internalSearcherManager.addListener(versionMap);
Expand All @@ -238,7 +238,7 @@ public InternalEngine(EngineConfig engineConfig) {
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(writer, translog, externalSearcherManager, internalSearcherManager, scheduler);
IOUtils.closeWhileHandlingException(writer, translog, internalSearcherManager, externalSearcherManager, scheduler);
if (isClosed.get() == false) {
// failure we need to dec the store reference
store.decRef();
Expand All @@ -248,6 +248,74 @@ public InternalEngine(EngineConfig engineConfig) {
logger.trace("created new InternalEngine");
}

/**
* This reference manager delegates all it's refresh calls to another (internal) SearcherManager
* The main purpose for this is that if we have external refreshes happening we don't issue extra
* refreshes to clear version map memory etc. this can cause excessive segment creation if heavy indexing
* is happening and the refresh interval is low (ie. 1 sec)
*
* This also prevents segment starvation where an internal reader holds on to old segments literally forever
* since no indexing is happening and refreshes are only happening to the external reader manager, while with
* this specialized implementation an external refresh will immediately be reflected on the internal reader
* and old segments can be released in the same way previous version did this (as a side-effect of _refresh)
*/
private static final class ExternalSearcherManager extends ReferenceManager<IndexSearcher> {
private final SearcherFactory searcherFactory;
private final SearcherManager internalSearcherManager;

public ExternalSearcherManager(SearcherManager manager, SearcherFactory searcherFactory) throws IOException {
IndexSearcher acquire = manager.acquire();
try {
IndexReader indexReader = acquire.getIndexReader();
assert indexReader instanceof ElasticsearchDirectoryReader:
"searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + indexReader;
indexReader.incRef(); // steal the reader - getSearcher will decrement if it fails
current = SearcherManager.getSearcher(searcherFactory, indexReader, null);
} finally {
manager.release(acquire);
}
this.searcherFactory = searcherFactory;
this.internalSearcherManager = manager;
}

@Override
protected IndexSearcher refreshIfNeeded(IndexSearcher referenceToRefresh) throws IOException {
// we simply run a blocking refresh on the internal reference manager and then steal it's reader
// it's a save operation since we acquire the reader which incs it's reference but then down the road
// steal it by calling incRef on the "stolen" reader
internalSearcherManager.maybeRefreshBlocking();
IndexSearcher acquire = internalSearcherManager.acquire();
final IndexReader previousReader = referenceToRefresh.getIndexReader();
assert previousReader instanceof ElasticsearchDirectoryReader:
"searcher's IndexReader should be an ElasticsearchDirectoryReader, but got " + previousReader;
try {
final IndexReader newReader = acquire.getIndexReader();
if (newReader == previousReader) {
// nothing has changed - both ref managers share the same instance so we can use reference equality
return null;
} else {
newReader.incRef(); // steal the reader - getSearcher will decrement if it fails
return SearcherManager.getSearcher(searcherFactory, newReader, previousReader);
}
} finally {
internalSearcherManager.release(acquire);
}
}

@Override
protected boolean tryIncRef(IndexSearcher reference) {
return reference.getIndexReader().tryIncRef();
}

@Override
protected int getRefCount(IndexSearcher reference) {
return reference.getIndexReader().getRefCount();
}

@Override
protected void decRef(IndexSearcher reference) throws IOException { reference.getIndexReader().decRef(); }
}

@Override
public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
Expand Down Expand Up @@ -456,18 +524,17 @@ private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean force
return uuid;
}

private SearcherManager createSearcherManager(SearcherFactory searcherFactory, boolean readSegmentsInfo) throws EngineException {
private ExternalSearcherManager createSearcherManager(SearchFactory factory) throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
try {
try {
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
searcherManager = new SearcherManager(directoryReader, searcherFactory);
if (readSegmentsInfo) {
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
}
searcherManager = new SearcherManager(directoryReader, new SearcherFactory());
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(searcherManager, factory);
success = true;
return searcherManager;
return externalSearcherManager;
} catch (IOException e) {
maybeFailEngine("start", e);
try {
Expand Down Expand Up @@ -1229,24 +1296,24 @@ public void refresh(String source) throws EngineException {
}

final void refresh(String source, SearcherScope scope) throws EngineException {
long bytes = 0;
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
// both refresh types will result in an internal refresh but only the external will also
// pass the new reader reference to the external reader manager.

// this will also cause version map ram to be freed hence we always account for it.
final long bytes = indexWriter.ramBytesUsed() + versionMap.ramBytesUsedForRefresh();
writingBytes.addAndGet(bytes);
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
bytes = indexWriter.ramBytesUsed();
switch (scope) {
case EXTERNAL:
// even though we maintain 2 managers we really do the heavy-lifting only once.
// the second refresh will only do the extra work we have to do for warming caches etc.
writingBytes.addAndGet(bytes);
externalSearcherManager.maybeRefreshBlocking();
// the break here is intentional we never refresh both internal / external together
break;
case INTERNAL:
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
bytes += versionMapBytes;
writingBytes.addAndGet(bytes);
internalSearcherManager.maybeRefreshBlocking();
break;
default:
Expand Down Expand Up @@ -1717,7 +1784,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
}

@Override
protected SearcherManager getSearcherManager(String source, SearcherScope scope) {
protected ReferenceManager<IndexSearcher> getSearcherManager(String source, SearcherScope scope) {
switch (scope) {
case INTERNAL:
return internalSearcherManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4226,7 +4226,7 @@ public void assertSameReader(Searcher left, Searcher right) {
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
assertEquals(rightLeaves.size(), leftLeaves.size());
for (int i = 0; i < leftLeaves.size(); i++) {
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader());
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(i).reader());
}
}

Expand All @@ -4235,7 +4235,7 @@ public void assertNotSameReader(Searcher left, Searcher right) {
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
if (rightLeaves.size() == leftLeaves.size()) {
for (int i = 0; i < leftLeaves.size(); i++) {
if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) {
if (leftLeaves.get(i).reader() != rightLeaves.get(i).reader()) {
return; // all is well
}
}
Expand Down Expand Up @@ -4263,7 +4263,6 @@ public void testRefreshScopedSearcher() throws IOException {
assertEquals(0, searchSearcher.reader().numDocs());
assertNotSameReader(getSearcher, searchSearcher);
}

engine.refresh("test", Engine.SearcherScope.EXTERNAL);

try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Expand All @@ -4272,6 +4271,36 @@ public void testRefreshScopedSearcher() throws IOException {
assertEquals(10, searchSearcher.reader().numDocs());
assertSameReader(getSearcher, searchSearcher);
}

// now ensure external refreshes are reflected on the internal reader
final String docId = Integer.toString(10);
final ParsedDocument doc =
testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null);
Engine.Index primaryResponse = indexForDoc(doc);
engine.index(primaryResponse);

engine.refresh("test", Engine.SearcherScope.EXTERNAL);

try (Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
assertEquals(11, getSearcher.reader().numDocs());
assertEquals(11, searchSearcher.reader().numDocs());
assertSameReader(getSearcher, searchSearcher);
}

try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
engine.refresh("test", Engine.SearcherScope.INTERNAL);
try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)){
assertSame(searcher.searcher(), nextSearcher.searcher());
}
}

try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
engine.refresh("test", Engine.SearcherScope.EXTERNAL);
try (Searcher nextSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)){
assertSame(searcher.searcher(), nextSearcher.searcher());
}
}
}

public void testSeqNoGenerator() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -133,7 +134,8 @@ public CloseAction flushOrClose(CloseAction originalAction) throws IOException {
}
}

public AssertingIndexSearcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
public AssertingIndexSearcher newSearcher(String source, IndexSearcher searcher,
ReferenceManager<IndexSearcher> manager) throws EngineException {
IndexReader reader = searcher.getIndexReader();
IndexReader wrappedReader = reader;
assert reader != null;
Expand Down Expand Up @@ -182,7 +184,8 @@ public DirectoryReaderWrapper(DirectoryReader in, SubReaderWrapper subReaderWrap

}

public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher, SearcherManager manager) {
public Engine.Searcher wrapSearcher(String source, Engine.Searcher engineSearcher, IndexSearcher searcher,
ReferenceManager<IndexSearcher> manager) {
final AssertingIndexSearcher assertingIndexSearcher = newSearcher(source, searcher, manager);
assertingIndexSearcher.setSimilarity(searcher.getSimilarity(true));
// pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will
Expand Down
Loading

0 comments on commit eb62da7

Please sign in to comment.