-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use separate searchers for "search visibility" vs "move indexing buffer to disk #26972
Changes from 2 commits
9dd892a
3cc471a
7076c9b
d308248
56349a9
6cc0a8b
5535320
dfa6122
17d1cc8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,6 +110,7 @@ public class InternalEngine extends Engine { | |
|
||
private final SearcherFactory searcherFactory; | ||
private final SearcherManager searcherManager; | ||
private final SearcherManager internalSearcherManager; | ||
|
||
private final Lock flushLock = new ReentrantLock(); | ||
private final ReentrantLock optimizeLock = new ReentrantLock(); | ||
|
@@ -164,6 +165,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { | |
IndexWriter writer = null; | ||
Translog translog = null; | ||
SearcherManager manager = null; | ||
SearcherManager internalSearcherManager = null; | ||
EngineMergeScheduler scheduler = null; | ||
boolean success = false; | ||
try { | ||
|
@@ -215,9 +217,11 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { | |
throw e; | ||
} | ||
} | ||
manager = createSearcherManager(); | ||
manager = createSearcherManager(searcherFactory); | ||
internalSearcherManager = createSearcherManager(new SearcherFactory()); | ||
this.internalSearcherManager = internalSearcherManager; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we name these by scope? i.e., getScopeSearcherManager? |
||
this.searcherManager = manager; | ||
this.versionMap.setManager(searcherManager); | ||
this.versionMap.setManager(internalSearcherManager); | ||
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; | ||
// don't allow commits until we are done with recovering | ||
pendingTranslogRecovery.set(openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); | ||
|
@@ -227,7 +231,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { | |
success = true; | ||
} finally { | ||
if (success == false) { | ||
IOUtils.closeWhileHandlingException(writer, translog, manager, scheduler); | ||
IOUtils.closeWhileHandlingException(writer, translog, manager, internalSearcherManager, scheduler); | ||
versionMap.clear(); | ||
if (isClosed.get() == false) { | ||
// failure we need to dec the store reference | ||
|
@@ -441,7 +445,7 @@ private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean force | |
return uuid; | ||
} | ||
|
||
private SearcherManager createSearcherManager() throws EngineException { | ||
private SearcherManager createSearcherManager(SearcherFactory searcherFactory) throws EngineException { | ||
boolean success = false; | ||
SearcherManager searcherManager = null; | ||
try { | ||
|
@@ -482,7 +486,7 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws | |
throw new VersionConflictEngineException(shardId, get.type(), get.id(), | ||
get.versionType().explainConflictForReads(versionValue.version, get.version())); | ||
} | ||
refresh("realtime_get"); | ||
refresh("realtime_get", false); | ||
} | ||
} | ||
|
||
|
@@ -1187,17 +1191,26 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { | |
|
||
@Override | ||
public void refresh(String source) throws EngineException { | ||
refresh(source, true); | ||
} | ||
|
||
final void refresh(String source, boolean refreshExternal) throws EngineException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we make the refreshExternal an EnumSet of SearcherScope? |
||
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing | ||
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) | ||
try (ReleasableLock lock = readLock.acquire()) { | ||
ensureOpen(); | ||
searcherManager.maybeRefreshBlocking(); | ||
internalSearcherManager.maybeRefreshBlocking(); | ||
if (refreshExternal) { | ||
// even though we maintain 2 managers we really do the heavy-lifting only once. | ||
// the second refresh will only do the extra work we have to do for warming caches etc. | ||
searcherManager.maybeRefreshBlocking(); | ||
} | ||
} catch (AlreadyClosedException e) { | ||
failOnTragicEvent(e); | ||
throw e; | ||
} catch (Exception e) { | ||
try { | ||
failEngine("refresh failed", e); | ||
failEngine("refresh failed source[" + source + "]", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ++ |
||
} catch (Exception inner) { | ||
e.addSuppressed(inner); | ||
} | ||
|
@@ -1219,10 +1232,6 @@ public void writeIndexingBuffer() throws EngineException { | |
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) | ||
try (ReleasableLock lock = readLock.acquire()) { | ||
ensureOpen(); | ||
|
||
// TODO: it's not great that we secretly tie searcher visibility to "freeing up heap" here... really we should keep two | ||
// searcher managers, one for searching which is only refreshed by the schedule the user requested (refresh_interval, or invoking | ||
// refresh API), and another for version map interactions. See #15768. | ||
final long versionMapBytes = versionMap.ramBytesUsedForRefresh(); | ||
final long indexingBufferBytes = indexWriter.ramBytesUsed(); | ||
|
||
|
@@ -1231,7 +1240,7 @@ public void writeIndexingBuffer() throws EngineException { | |
// The version map is using > 25% of the indexing buffer, so we do a refresh so the version map also clears | ||
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])", | ||
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes)); | ||
refresh("write indexing buffer"); | ||
refresh("write indexing buffer", false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we can now always use this "refresh" path here rather than the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 I think that's good |
||
} else { | ||
// Most of our heap is used by the indexing buffer, so we do a cheaper (just writes segments, doesn't open a new searcher) IW.flush: | ||
logger.debug("use IndexWriter.flush to write indexing buffer (heap size=[{}]) since version map is small (heap size=[{}])", | ||
|
@@ -1303,9 +1312,8 @@ final boolean tryRenewSyncCommit() { | |
throw new EngineException(shardId, "failed to renew sync commit", ex); | ||
} | ||
if (renewed) { // refresh outside of the write lock | ||
refresh("renew sync commit"); | ||
refresh("renew sync commit"); // we have to refresh both searchers here to ensure we release unreferenced segments. | ||
} | ||
|
||
return renewed; | ||
} | ||
|
||
|
@@ -1347,7 +1355,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti | |
commitIndexWriter(indexWriter, translog, null); | ||
logger.trace("finished commit for flush"); | ||
// we need to refresh in order to clear older version values | ||
refresh("version_table_flush"); | ||
refresh("version_table_flush"); // TODO technically we could also only refresh the internal searcher | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't the comment about releasing segments in tryRenewSyncCommit holds here too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I did some copy past mess here. I will fix |
||
translog.trimUnreferencedReaders(); | ||
} catch (Exception e) { | ||
throw new FlushFailedEngineException(shardId, e); | ||
|
@@ -1500,6 +1508,8 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu | |
if (flush) { | ||
if (tryRenewSyncCommit() == false) { | ||
flush(false, true); | ||
} else { | ||
refresh("renew sync commit"); // we have to refresh both searchers here to ensure we release unreferenced segments. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't tryRenewSyncCommit already doing it in this case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see my comment above sorry for the noise |
||
} | ||
} | ||
if (upgrade) { | ||
|
@@ -1652,7 +1662,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { | |
try { | ||
this.versionMap.clear(); | ||
try { | ||
IOUtils.close(searcherManager); | ||
IOUtils.close(searcherManager, internalSearcherManager); | ||
} catch (Exception e) { | ||
logger.warn("Failed to close SearcherManager", e); | ||
} | ||
|
@@ -1684,8 +1694,15 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { | |
} | ||
|
||
@Override | ||
protected SearcherManager getSearcherManager() { | ||
return searcherManager; | ||
protected SearcherManager getSearcherManager(String source, SearcherScope scope) { | ||
switch (scope) { | ||
case GET: | ||
return internalSearcherManager; | ||
case SEARCH: | ||
return searcherManager; | ||
default: | ||
throw new IllegalStateException("unknonw scope: " + scope); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo: unknown |
||
} | ||
} | ||
|
||
private Releasable acquireLock(BytesRef uid) { | ||
|
@@ -1867,6 +1884,10 @@ protected void doRun() throws Exception { | |
// free up transient disk usage of the (presumably biggish) segments that were just merged | ||
if (tryRenewSyncCommit() == false) { | ||
flush(); | ||
} else { | ||
// we only refresh the rather cheap internal searcher manager in order to not trigger new datastructures | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment - if |
||
// by accident ie. warm big segments in parent child case etc. | ||
refresh("renew sync commit", false); | ||
} | ||
} | ||
}); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -832,7 +832,7 @@ private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws I | |
|
||
public Engine.GetResult get(Engine.Get get) { | ||
readAllowed(); | ||
return getEngine().get(get, this::acquireSearcher); | ||
return getEngine().get(get, source -> this.acquireSearcher(source, Engine.SearcherScope.GET)); | ||
} | ||
|
||
/** | ||
|
@@ -1127,11 +1127,14 @@ public void failShard(String reason, @Nullable Exception e) { | |
// fail the engine. This will cause this shard to also be removed from the node's index service. | ||
getEngine().failEngine(reason, e); | ||
} | ||
|
||
public Engine.Searcher acquireSearcher(String source) { | ||
return acquireSearcher(source, Engine.SearcherScope.SEARCH); | ||
} | ||
|
||
private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scope) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with the move to internal external (rather then get/search), I think this method can go and that also means that the Engine.acquireSearch(source, scope) can be made protected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never mind. I see it now :( |
||
readAllowed(); | ||
final Engine engine = getEngine(); | ||
final Engine.Searcher searcher = engine.acquireSearcher(source); | ||
final Engine.Searcher searcher = engine.acquireSearcher(source, scope); | ||
boolean success = false; | ||
try { | ||
final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(searcher); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -86,6 +86,7 @@ | |
import org.elasticsearch.common.collect.Tuple; | ||
import org.elasticsearch.common.logging.Loggers; | ||
import org.elasticsearch.common.lucene.Lucene; | ||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; | ||
import org.elasticsearch.common.lucene.uid.Versions; | ||
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; | ||
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; | ||
|
@@ -942,7 +943,7 @@ public void testConcurrentGetAndFlush() throws Exception { | |
engine.index(indexForDoc(doc)); | ||
|
||
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>(); | ||
final Function<String, Searcher> searcherFactory = engine::acquireSearcher; | ||
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET); | ||
latestGetResult.set(engine.get(newGet(true, doc), searcherFactory)); | ||
final AtomicBoolean flushFinished = new AtomicBoolean(false); | ||
final CyclicBarrier barrier = new CyclicBarrier(2); | ||
|
@@ -977,7 +978,7 @@ public void testSimpleOperations() throws Exception { | |
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); | ||
searchResult.close(); | ||
|
||
final Function<String, Searcher> searcherFactory = engine::acquireSearcher; | ||
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET); | ||
|
||
// create a document | ||
Document document = testDocumentWithTextField(); | ||
|
@@ -1884,7 +1885,7 @@ class OpAndVersion { | |
ParsedDocument doc = testParsedDocument("1", null, testDocument(), bytesArray(""), null); | ||
final Term uidTerm = newUid(doc); | ||
engine.index(indexForDoc(doc)); | ||
final Function<String, Searcher> searcherFactory = engine::acquireSearcher; | ||
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET); | ||
for (int i = 0; i < thread.length; i++) { | ||
thread[i] = new Thread(() -> { | ||
startGun.countDown(); | ||
|
@@ -2314,7 +2315,7 @@ public void testEnableGcDeletes() throws Exception { | |
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { | ||
engine.config().setEnableGcDeletes(false); | ||
|
||
final Function<String, Searcher> searcherFactory = engine::acquireSearcher; | ||
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET); | ||
|
||
// Add document | ||
Document document = testDocument(); | ||
|
@@ -3847,7 +3848,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio | |
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); | ||
final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null); | ||
final Term uid = newUid(doc); | ||
final Function<String, Searcher> searcherFactory = engine::acquireSearcher; | ||
final Function<String, Searcher> searcherFactory = s -> engine.acquireSearcher(s, Engine.SearcherScope.GET); | ||
for (int i = 0; i < numberOfOperations; i++) { | ||
if (randomBoolean()) { | ||
final Engine.Index index = new Engine.Index( | ||
|
@@ -4203,4 +4204,59 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException { | |
IOUtils.close(recoveringEngine); | ||
} | ||
} | ||
|
||
|
||
public void assertSameReader(Searcher left, Searcher right) { | ||
List<LeafReaderContext> leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves(); | ||
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves(); | ||
assertEquals(rightLeaves.size(), leftLeaves.size()); | ||
for (int i = 0; i < leftLeaves.size(); i++) { | ||
assertSame(leftLeaves.get(i).reader(), rightLeaves.get(0).reader()); | ||
} | ||
} | ||
|
||
public void assertNotSameReader(Searcher left, Searcher right) { | ||
List<LeafReaderContext> leftLeaves = ElasticsearchDirectoryReader.unwrap(left.getDirectoryReader()).leaves(); | ||
List<LeafReaderContext> rightLeaves = ElasticsearchDirectoryReader.unwrap(right.getDirectoryReader()).leaves(); | ||
if (rightLeaves.size() == leftLeaves.size()) { | ||
for (int i = 0; i < leftLeaves.size(); i++) { | ||
if (leftLeaves.get(i).reader() != rightLeaves.get(0).reader()) { | ||
return; // all is well | ||
} | ||
} | ||
fail("readers are same"); | ||
} | ||
} | ||
|
||
public void testRefreshScopedSearcher() throws IOException { | ||
Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.GET); | ||
Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.SEARCH); | ||
assertSameReader(getSearcher, searchSearcher); | ||
IOUtils.close(getSearcher, searchSearcher); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit - should we use try-with-resources to avoid leaking on failure? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will do |
||
for (int i = 0; i < 10; i++) { | ||
final String docId = Integer.toString(i); | ||
final ParsedDocument doc = | ||
testParsedDocument(docId, null, testDocumentWithTextField(), SOURCE, null); | ||
Engine.Index primaryResponse = indexForDoc(doc); | ||
engine.index(primaryResponse); | ||
} | ||
assertTrue(engine.refreshNeeded()); | ||
engine.refresh("test", false); | ||
|
||
getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.GET); | ||
assertEquals(10, getSearcher.reader().numDocs()); | ||
searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.SEARCH); | ||
assertEquals(0, searchSearcher.reader().numDocs()); | ||
assertNotSameReader(getSearcher, searchSearcher); | ||
IOUtils.close(getSearcher, searchSearcher); | ||
|
||
engine.refresh("test", true); | ||
|
||
getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.GET); | ||
assertEquals(10, getSearcher.reader().numDocs()); | ||
searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.SEARCH); | ||
assertEquals(10, searchSearcher.reader().numDocs()); | ||
assertSameReader(getSearcher, searchSearcher); | ||
IOUtils.close(getSearcher, searchSearcher); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - now that we call
createSearchManager
twice, shall we move the following line out of that method and into this constructor? it feels unrelated.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another nit -
searcherFactory
->searcherFactoryForSearch
?