Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use separate searchers for "search visibility" vs "move indexing buffer to disk #26972

Merged
merged 9 commits into from
Oct 12, 2017
27 changes: 24 additions & 3 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -496,21 +496,38 @@ protected final GetResult getFromSearcher(Get get, Function<String, Searcher> se

public abstract GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException;


/**
* Returns a new searcher instance. The consumer of this
* API is responsible for releasing the returned searcher in a
* safe manner, preferably in a try/finally block.
*
* @param source the source API or routing that triggers this searcher acquire
*
* @see Searcher#close()
*/
public final Searcher acquireSearcher(String source) throws EngineException {
return acquireSearcher(source, SearcherScope.SEARCH);
}

/**
* Returns a new searcher instance. The consumer of this
* API is responsible for releasing the returned searcher in a
* safe manner, preferably in a try/finally block.
*
* @param source the source API or routing that triggers this searcher acquire
* @param scope the scope of this searcher ie. if the searcher will be used for get or search purposes
*
* @see Searcher#close()
*/
public final Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
boolean success = false;
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
store.incRef();
try {
final SearcherManager manager = getSearcherManager(); // can never be null
final SearcherManager manager = getSearcherManager(source, scope); // can never be null
/* This might throw NPE but that's fine we will run ensureOpen()
* in the catch block and throw the right exception */
final IndexSearcher searcher = manager.acquire();
Expand All @@ -536,6 +553,10 @@ public final Searcher acquireSearcher(String source) throws EngineException {
}
}

public enum SearcherScope {
SEARCH, GET
}

/** returns the translog for this engine */
public abstract Translog getTranslog();

Expand Down Expand Up @@ -768,7 +789,7 @@ public final boolean refreshNeeded() {
the store is closed so we need to make sure we increment it here
*/
try {
return getSearcherManager().isSearcherCurrent() == false;
return getSearcherManager("refresh_needed", SearcherScope.SEARCH).isSearcherCurrent() == false;
} catch (IOException e) {
logger.error("failed to access searcher manager", e);
failEngine("failed to access searcher manager", e);
Expand Down Expand Up @@ -1306,7 +1327,7 @@ public void release() {
}
}

protected abstract SearcherManager getSearcherManager();
protected abstract SearcherManager getSearcherManager(String source, SearcherScope scope);

/**
* Method to close the engine while the write lock is held.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class InternalEngine extends Engine {

private final SearcherFactory searcherFactory;
private final SearcherManager searcherManager;
private final SearcherManager internalSearcherManager;

private final Lock flushLock = new ReentrantLock();
private final ReentrantLock optimizeLock = new ReentrantLock();
Expand Down Expand Up @@ -164,6 +165,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
IndexWriter writer = null;
Translog translog = null;
SearcherManager manager = null;
SearcherManager internalSearcherManager = null;
EngineMergeScheduler scheduler = null;
boolean success = false;
try {
Expand Down Expand Up @@ -215,9 +217,11 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
throw e;
}
}
manager = createSearcherManager();
manager = createSearcherManager(searcherFactory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - now that we call createSearchManager twice, shall we move the following line out of that method and into this constructor? it feels unrelated.

lastCommittedSegmentInfos = readLastCommittedSegmentInfos(searcherManager, store);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nit - searcherFactory -> searcherFactoryForSearch?

internalSearcherManager = createSearcherManager(new SearcherFactory());
this.internalSearcherManager = internalSearcherManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we name these by scope? i.e., getScopeSearcherManager?

this.searcherManager = manager;
this.versionMap.setManager(searcherManager);
this.versionMap.setManager(internalSearcherManager);
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
// don't allow commits until we are done with recovering
pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
Expand All @@ -227,7 +231,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(writer, translog, manager, scheduler);
IOUtils.closeWhileHandlingException(writer, translog, manager, internalSearcherManager, scheduler);
versionMap.clear();
if (isClosed.get() == false) {
// failure we need to dec the store reference
Expand Down Expand Up @@ -441,7 +445,7 @@ private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean force
return uuid;
}

private SearcherManager createSearcherManager() throws EngineException {
private SearcherManager createSearcherManager(SearcherFactory searcherFactory) throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
try {
Expand Down Expand Up @@ -482,7 +486,7 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(versionValue.version, get.version()));
}
refresh("realtime_get");
refresh("realtime_get", false);
}
}

Expand Down Expand Up @@ -1187,17 +1191,26 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {

@Override
public void refresh(String source) throws EngineException {
refresh(source, true);
}

final void refresh(String source, boolean refreshExternal) throws EngineException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we make the refreshExternal an EnumSet of SearcherScope?

// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
searcherManager.maybeRefreshBlocking();
internalSearcherManager.maybeRefreshBlocking();
if (refreshExternal) {
// even though we maintain 2 managers we really do the heavy-lifting only once.
// the second refresh will only do the extra work we have to do for warming caches etc.
searcherManager.maybeRefreshBlocking();
}
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("refresh failed", e);
failEngine("refresh failed source[" + source + "]", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

} catch (Exception inner) {
e.addSuppressed(inner);
}
Expand All @@ -1219,10 +1232,6 @@ public void writeIndexingBuffer() throws EngineException {
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();

// TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two
// searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking
// refresh API), and another for version map interactions. See #15768.
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
final long indexingBufferBytes = indexWriter.ramBytesUsed();

Expand All @@ -1231,7 +1240,7 @@ public void writeIndexingBuffer() throws EngineException {
// The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
refresh("write indexing buffer");
refresh("write indexing buffer", false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can now always use this "refresh" path here rather than the IndexWriter.flush - refresh is now much lighter? (not sure what other side effects are there).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think that's good

} else {
// Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush:
logger.debug("use IndexWriter.flush to write indexing buffer (heap size=[{}]) since version map is small (heap size=[{}])",
Expand Down Expand Up @@ -1303,9 +1312,8 @@ final boolean tryRenewSyncCommit() {
throw new EngineException(shardId, "failed to renew sync commit", ex);
}
if (renewed) { // refresh outside of the write lock
refresh("renew sync commit");
refresh("renew sync commit"); // we have to refresh both searchers here to ensure we release unreferenced segments.
}

return renewed;
}

Expand Down Expand Up @@ -1347,7 +1355,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
commitIndexWriter(indexWriter, translog, null);
logger.trace("finished commit for flush");
// we need to refresh in order to clear older version values
refresh("version_table_flush");
refresh("version_table_flush"); // TODO technically we could also only refresh the internal searcher
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the comment about releasing segments in tryRenewSyncCommit holds here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I did some copy past mess here. I will fix

translog.trimUnreferencedReaders();
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
Expand Down Expand Up @@ -1500,6 +1508,8 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
if (flush) {
if (tryRenewSyncCommit() == false) {
flush(false, true);
} else {
refresh("renew sync commit"); // we have to refresh both searchers here to ensure we release unreferenced segments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't tryRenewSyncCommit already doing it in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my comment above sorry for the noise

}
}
if (upgrade) {
Expand Down Expand Up @@ -1652,7 +1662,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
try {
this.versionMap.clear();
try {
IOUtils.close(searcherManager);
IOUtils.close(searcherManager, internalSearcherManager);
} catch (Exception e) {
logger.warn("Failed to close SearcherManager", e);
}
Expand Down Expand Up @@ -1684,8 +1694,15 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
}

@Override
protected SearcherManager getSearcherManager() {
return searcherManager;
protected SearcherManager getSearcherManager(String source, SearcherScope scope) {
switch (scope) {
case GET:
return internalSearcherManager;
case SEARCH:
return searcherManager;
default:
throw new IllegalStateException("unknonw scope: " + scope);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: unknown

}
}

private Releasable acquireLock(BytesRef uid) {
Expand Down Expand Up @@ -1867,6 +1884,10 @@ protected void doRun() throws Exception {
// free up transient disk usage of the (presumably biggish) segments that were just merged
if (tryRenewSyncCommit() == false) {
flush();
} else {
// we only refresh the rather cheap internal searcher manager in order to not trigger new datastructures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment - if tryRenewSyncCommit returned true, it already refreshed, no?

// by accident ie. warm big segments in parent child case etc.
refresh("renew sync commit", false);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws I

public Engine.GetResult get(Engine.Get get) {
readAllowed();
return getEngine().get(get, this::acquireSearcher);
return getEngine().get(get, source -> this.acquireSearcher(source, Engine.SearcherScope.GET));
}

/**
Expand Down Expand Up @@ -1127,11 +1127,14 @@ public void failShard(String reason, @Nullable Exception e) {
// fail the engine. This will cause this shard to also be removed from the node's index service.
getEngine().failEngine(reason, e);
}

public Engine.Searcher acquireSearcher(String source) {
return acquireSearcher(source, Engine.SearcherScope.SEARCH);
}

private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the move to internal external (rather then get/search), I think this method can go and that also means that the Engine.acquireSearch(source, scope) can be made protected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. I see it now :(

readAllowed();
final Engine engine = getEngine();
final Engine.Searcher searcher = engine.acquireSearcher(source);
final Engine.Searcher searcher = engine.acquireSearcher(source, scope);
boolean success = false;
try {
final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(searcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
Expand Down Expand Up @@ -942,7 +943,7 @@ public void testConcurrentGetAndFlush() throws Exception {
engine.index(indexForDoc(doc));

final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET);
latestGetResult.set(engine.get(newGet(true, doc), searcherFactory));
final AtomicBoolean flushFinished = new AtomicBoolean(false);
final CyclicBarrier barrier = new CyclicBarrier(2);
Expand Down Expand Up @@ -977,7 +978,7 @@ public void testSimpleOperations() throws Exception {
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();

final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET);

// create a document
Document document = testDocumentWithTextField();
Expand Down Expand Up @@ -1884,7 +1885,7 @@ class OpAndVersion {
ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null);
final Term uidTerm = newUid(doc);
engine.index(indexForDoc(doc));
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET);
for (int i = 0; i < thread.length; i++) {
thread[i] = new Thread(() -> {
startGun.countDown();
Expand Down Expand Up @@ -2314,7 +2315,7 @@ public void testEnableGcDeletes() throws Exception {
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
engine.config().setEnableGcDeletes(false);

final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET);

// Add document
Document document = testDocument();
Expand Down Expand Up @@ -3847,7 +3848,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null);
final Term uid = newUid(doc);
final Function<String, Searcher> searcherFactory = engine::acquireSearcher;
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET);
for (int i = 0; i < numberOfOperations; i++) {
if (randomBoolean()) {
final Engine.Index index = new Engine.Index(
Expand Down Expand Up @@ -4203,4 +4204,59 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
IOUtils.close(recoveringEngine);
}
}


public void assertSameReader(Searcher left, Searcher right) {
List<LeafReaderContext> leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves();
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
assertEquals(rightLeaves.size(), leftLeaves.size());
for (int i = 0; i < leftLeaves.size(); i++) {
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader());
}
}

public void assertNotSameReader(Searcher left, Searcher right) {
List<LeafReaderContext> leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves();
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves();
if (rightLeaves.size() == leftLeaves.size()) {
for (int i = 0; i < leftLeaves.size(); i++) {
if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) {
return; // all is well
}
}
fail("readers are same");
}
}

public void testRefreshScopedSearcher() throws IOException {
Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.GET);
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.SEARCH);
assertSameReader(getSearcher, searchSearcher);
IOUtils.close(getSearcher, searchSearcher);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - should we use try-with-resources to avoid leaking on failure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

for (int i = 0; i < 10; i++) {
final String docId = Integer.toString(i);
final ParsedDocument doc =
testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null);
Engine.Index primaryResponse = indexForDoc(doc);
engine.index(primaryResponse);
}
assertTrue(engine.refreshNeeded());
engine.refresh("test", false);

getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.GET);
assertEquals(10, getSearcher.reader().numDocs());
searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.SEARCH);
assertEquals(0, searchSearcher.reader().numDocs());
assertNotSameReader(getSearcher, searchSearcher);
IOUtils.close(getSearcher, searchSearcher);

engine.refresh("test", true);

getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.GET);
assertEquals(10, getSearcher.reader().numDocs());
searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.SEARCH);
assertEquals(10, searchSearcher.reader().numDocs());
assertSameReader(getSearcher, searchSearcher);
IOUtils.close(getSearcher, searchSearcher);
}
}
Loading