Skip to content

Commit

Permalink
Ensure either success or failure path for SearchOperationListener is …
Browse files Browse the repository at this point in the history
…called (#37467)

Today we have several implementations of executing SearchOperationListener
in SearchService. While all of them seem to be safe at least on, the one that
executes scroll searches can cause illegal execution of SearchOperationListener
that can then in-turn trigger assertions in ShardSearchStats. This change
adds a SearchOperationListenerExecutor that uses try-with blocks to ensure
listeners are called in a safe way.

Relates to #37185
  • Loading branch information
s1monw committed Jan 23, 2019
1 parent daf95b5 commit 4f0a1fa
Showing 1 changed file with 95 additions and 76 deletions.
171 changes: 95 additions & 76 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchTask;
Expand Down Expand Up @@ -332,7 +331,7 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask t
} catch (Exception e) {
logger.trace("Dfs phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
Expand Down Expand Up @@ -383,29 +382,24 @@ protected void doRun() {
});
}

private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception {
final SearchContext context = createAndPutContext(request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
boolean queryPhaseSuccess = false;
try {
context.setTask(task);
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);

loadOrExecuteQueryPhase(request, context);

if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
final long afterQueryTime;
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
contextProcessing(context);
loadOrExecuteQueryPhase(request, context);
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
}
afterQueryTime = executor.success();
}
final long afterQueryTime = System.nanoTime();
queryPhaseSuccess = true;
operationListener.onQueryPhase(context, afterQueryTime - time);
if (request.numberOfShards() == 1) {
return executeFetchPhase(context, operationListener, afterQueryTime);
return executeFetchPhase(context, afterQueryTime);
}
return context.queryResult();
} catch (Exception e) {
Expand All @@ -414,56 +408,44 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTa
e = (e.getCause() == null || e.getCause() instanceof Exception) ?
(Exception) e.getCause() : new ElasticsearchException(e.getCause());
}
if (!queryPhaseSuccess) {
operationListener.onFailedQueryPhase(context);
}
logger.trace("Query phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
}

private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOperationListener operationListener,
long afterQueryTime) {
operationListener.onPreFetchPhase(context);
try {
private QueryFetchSearchResult executeFetchPhase(SearchContext context, long afterQueryTime) {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)){
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (fetchPhaseShouldFreeContext(context)) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
}
} catch (Exception e) {
operationListener.onFailedFetchPhase(context);
throw ExceptionsHelper.convertToRuntime(e);
executor.success();
}
operationListener.onFetchPhase(context, System.nanoTime() - afterQueryTime);
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}

public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener<ScrollQuerySearchResult> listener) {
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
context.setTask(task);
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);
processScroll(request, context);
queryPhase.execute(context);
contextProcessedSuccessfully(context);
operationListener.onQueryPhase(context, System.nanoTime() - time);
executor.success();
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
} catch (Exception e) {
operationListener.onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
Expand All @@ -474,29 +456,23 @@ public void executeQueryPhase(QuerySearchRequest request, SearchTask task, Actio
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
context.setTask(task);
IndexShard indexShard = context.indexShard();
SearchOperationListener operationListener = indexShard.getSearchOperationListener();
context.incRef();
try {
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs());

operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
queryPhase.execute(context);
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
// no hits, we can release the context since there will be no fetch phase
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
}
operationListener.onQueryPhase(context, System.nanoTime() - time);
executor.success();
return context.queryResult();
} catch (Exception e) {
operationListener.onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
Expand Down Expand Up @@ -530,28 +506,19 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta
ActionListener<ScrollQueryFetchSearchResult> listener) {
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
context.setTask(task);
context.incRef();
try {
context.setTask(task);
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){
contextProcessing(context);
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
processScroll(request, context);
operationListener.onPreQueryPhase(context);
final long time = System.nanoTime();
try {
queryPhase.execute(context);
} catch (Exception e) {
operationListener.onFailedQueryPhase(context);
throw ExceptionsHelper.convertToRuntime(e);
}
long afterQueryTime = System.nanoTime();
operationListener.onQueryPhase(context, afterQueryTime - time);
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime);
queryPhase.execute(context);
final long afterQueryTime = executor.success();
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, afterQueryTime);
return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget());
} catch (Exception e) {
logger.trace("Fetch phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
Expand All @@ -561,7 +528,6 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta
public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener<FetchSearchResult> listener) {
runAsync(request.id(), () -> {
final SearchContext context = findContext(request.id(), request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
context.setTask(task);
Expand All @@ -570,21 +536,20 @@ public void executeFetchPhase(ShardFetchRequest request, SearchTask task, Action
context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
}
context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
operationListener.onPreFetchPhase(context);
long time = System.nanoTime();
fetchPhase.execute(context);
if (fetchPhaseShouldFreeContext(context)) {
freeContext(request.id());
} else {
contextProcessedSuccessfully(context);
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) {
fetchPhase.execute(context);
if (fetchPhaseShouldFreeContext(context)) {
freeContext(request.id());
} else {
contextProcessedSuccessfully(context);
}
executor.success();
}
operationListener.onFetchPhase(context, System.nanoTime() - time);
return context.fetchResult();
} catch (Exception e) {
operationListener.onFailedFetchPhase(context);
logger.trace("Fetch phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
throw e;
} finally {
cleanContext(context);
}
Expand Down Expand Up @@ -676,7 +641,7 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException
context.lowLevelCancellation(lowLevelCancellation);
} catch (Exception e) {
context.close();
throw ExceptionsHelper.convertToRuntime(e);
throw e;
}

return context;
Expand Down Expand Up @@ -750,7 +715,7 @@ public void freeAllScrollContexts() {
}
}

private void contextScrollKeepAlive(SearchContext context, long keepAlive) throws IOException {
private void contextScrollKeepAlive(SearchContext context, long keepAlive) {
if (keepAlive > maxKeepAlive) {
throw new QueryPhaseExecutionException(context,
"Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive) + ") is too large. " +
Expand Down Expand Up @@ -1003,7 +968,7 @@ private void shortcutDocIdsToLoad(SearchContext context) {
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
}

private void processScroll(InternalScrollSearchRequest request, SearchContext context) throws IOException {
private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
// process scroll
context.from(context.from() + context.size());
context.scrollContext().scroll = request.scroll();
Expand Down Expand Up @@ -1156,4 +1121,58 @@ public boolean canMatch() {
return canMatch;
}
}

/**
* This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
* This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
*/
private static final class SearchOperationListenerExecutor implements AutoCloseable {
private final SearchOperationListener listener;
private final SearchContext context;
private final long time;
private final boolean fetch;
private long afterQueryTime = -1;
private boolean closed = false;

SearchOperationListenerExecutor(SearchContext context) {
this(context, false, System.nanoTime());
}

SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) {
this.listener = context.indexShard().getSearchOperationListener();
this.context = context;
time = startTime;
this.fetch = fetch;
if (fetch) {
listener.onPreFetchPhase(context);
} else {
listener.onPreQueryPhase(context);
}
}

long success() {
return afterQueryTime = System.nanoTime();
}

@Override
public void close() {
assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case";
if (closed == false) {
closed = true;
if (afterQueryTime != -1) {
if (fetch) {
listener.onFetchPhase(context, afterQueryTime - time);
} else {
listener.onQueryPhase(context, afterQueryTime - time);
}
} else {
if (fetch) {
listener.onFailedFetchPhase(context);
} else {
listener.onFailedQueryPhase(context);
}
}
}
}
}
}

0 comments on commit 4f0a1fa

Please sign in to comment.