Skip to content

Commit

Permalink
Fix concurrent issue in SearchPhaseController
Browse files Browse the repository at this point in the history
The list used by the search progress listener can be nullified
by another thread that reports a query result. This change replaces
the usage of this list with a new array that is synchronously modified.

Closes elastic#49778
  • Loading branch information
jimczi committed Dec 4, 2019
1 parent f38ce06 commit 7554648
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
Expand Down Expand Up @@ -564,6 +565,7 @@ public InternalSearchResponse buildResponse(SearchHits hits) {
* iff the buffer is exhausted.
*/
static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhaseResult> {
private final SearchShardTarget[] processedShards;
private final InternalAggregations[] aggsBuffer;
private final TopDocs[] topDocsBuffer;
private final boolean hasAggs;
Expand Down Expand Up @@ -600,6 +602,7 @@ private QueryPhaseResultConsumer(SearchProgressListener progressListener, Search
}
this.controller = controller;
this.progressListener = progressListener;
this.processedShards = new SearchShardTarget[expectedResultSize];
// no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time.
this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0];
this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0];
Expand Down Expand Up @@ -636,7 +639,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
numReducePhases++;
index = 1;
if (hasAggs) {
progressListener.notifyPartialReduce(progressListener.searchShards(results.asList()),
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
}
}
Expand All @@ -650,6 +653,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
}

private synchronized List<InternalAggregations> getRemainingAggs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregations;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -169,6 +171,13 @@ final List<SearchShard> searchShards(List<? extends SearchPhaseResult> results)
.collect(Collectors.toUnmodifiableList());
}

final List<SearchShard> searchShards(SearchShardTarget[] results) {
return Arrays.stream(results)
.filter(Objects::nonNull)
.map(e -> new SearchShard(e.getClusterAlias(), e.getShardId()))
.collect(Collectors.toUnmodifiableList());
}

final List<SearchShard> searchShards(GroupShardsIterator<SearchShardIterator> its) {
return StreamSupport.stream(its.spliterator(), false)
.map(e -> new SearchShard(e.getClusterAlias(), e.shardId()))
Expand Down

0 comments on commit 7554648

Please sign in to comment.