Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross Cluster Search: make remote clusters optional #27182

Merged
merged 15 commits into from
Nov 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ protected void doExecute(SearchRequest request, ActionListener<SearchResponse> l
new SearchHit[0], 0L, 0.0f),
new InternalAggregations(Collections.emptyList()),
new Suggest(Collections.emptyList()),
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, 0, new ShardSearchFailure[0]));
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ public void testInfo() throws IOException {
public void testSearchScroll() throws IOException {
Header[] headers = randomHeaders(random(), "Header");
SearchResponse mockSearchResponse = new SearchResponse(new SearchResponseSections(SearchHits.empty(), InternalAggregations.EMPTY,
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, new ShardSearchFailure[0]);
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY);
mockResponse(mockSearchResponse);
SearchResponse searchResponse = restHighLevelClient.searchScroll(new SearchScrollRequest(randomAlphaOfLengthBetween(5, 10)),
headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,5 +470,6 @@ private static void assertSearchHeader(SearchResponse searchResponse) {
assertThat(searchResponse.getTotalShards(), greaterThan(0));
assertEquals(searchResponse.getTotalShards(), searchResponse.getSuccessfulShards());
assertEquals(0, searchResponse.getShardFailures().length);
assertEquals(SearchResponse.Clusters.EMPTY, searchResponse.getClusters());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class ClusterSearchShardsResponse extends ActionResponse implements ToXContentObject {

public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
new DiscoveryNode[0], Collections.emptyMap());

private ClusterSearchShardsGroup[] groups;
private DiscoveryNode[] nodes;
private Map<String, AliasFilter> indicesAndFilters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,16 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger skippedOps = new AtomicInteger();
private final TransportSearchAction.SearchTimeProvider timeProvider;

private final SearchResponse.Clusters clusters;

protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentShardRequests) {
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentShardRequests,
SearchResponse.Clusters clusters) {
super(name, request, shardsIts, logger, maxConcurrentShardRequests, executor);
this.timeProvider = timeProvider;
this.logger = logger;
Expand All @@ -90,6 +91,7 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS
this.concreteIndexBoosts = concreteIndexBoosts;
this.aliasFilter = aliasFilter;
this.results = resultConsumer;
this.clusters = clusters;
}

/**
Expand All @@ -108,7 +110,7 @@ public final void start() {
//no search shards to search on, bail with empty response
//(it happens with search across _all with no indices around and consistent with broadcast operations)
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY));
ShardSearchFailure.EMPTY_ARRAY, clusters));
return;
}
executePhase(this);
Expand Down Expand Up @@ -264,7 +266,7 @@ public final SearchRequest getRequest() {
@Override
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), buildShardFailures());
skippedOps.get(), buildTookInMillis(), buildShardFailures(), clusters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.stream.Stream;

/**
* This search phrase can be used as an initial search phase to pre-filter search shards based on query rewriting.
* This search phase can be used as an initial search phase to pre-filter search shards based on query rewriting.
* The queries are rewritten against the shards and based on the rewrite result shards might be able to be excluded
* from the search. The extra round trip to the search shards is very cheap and is not subject to rejections
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
Expand All @@ -50,13 +50,15 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory) {
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
SearchResponse.Clusters clusters) {
/*
* We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node
* is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards.
*/
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request,
listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size());
listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(),
clusters);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
final long clusterStateVersion, final SearchTask task) {
final long clusterStateVersion, final SearchTask task, SearchResponse.Clusters clusters) {
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()),
request.getMaxConcurrentShardRequests());
request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
final SearchRequest request, final ActionListener<SearchResponse> listener,
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
long clusterStateVersion, SearchTask task) {
long clusterStateVersion, SearchTask task, SearchResponse.Clusters clusters) {
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()),
request.getMaxConcurrentShardRequests());
request.getMaxConcurrentShardRequests(), clusters);
this.searchPhaseController = searchPhaseController;
}

Expand Down
151 changes: 148 additions & 3 deletions core/src/main/java/org/elasticsearch/action/search/SearchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -43,6 +45,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
Expand Down Expand Up @@ -71,15 +74,18 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb

private ShardSearchFailure[] shardFailures;

private Clusters clusters;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be final?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid not, SearchResponse implements Streamable and not Writeable as it is an ActionResponse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough


private long tookInMillis;

public SearchResponse() {
}

public SearchResponse(SearchResponseSections internalResponse, String scrollId, int totalShards, int successfulShards,
int skippedShards, long tookInMillis, ShardSearchFailure[] shardFailures) {
int skippedShards, long tookInMillis, ShardSearchFailure[] shardFailures, Clusters clusters) {
this.internalResponse = internalResponse;
this.scrollId = scrollId;
this.clusters = clusters;
this.totalShards = totalShards;
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
Expand Down Expand Up @@ -199,6 +205,15 @@ public Map<String, ProfileShardResult> getProfileResults() {
return internalResponse.profile();
}

/**
* Returns info about what clusters the search was executed against. Available only in responses obtained
* from a Cross Cluster Search request, otherwise <code>null</code>
* @see Clusters
*/
public Clusters getClusters() {
return clusters;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -221,6 +236,7 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
}
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getSkippedShards(),
getFailedShards(), getShardFailures());
clusters.toXContent(builder, params);
internalResponse.toXContent(builder, params);
return builder;
}
Expand All @@ -242,6 +258,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept
int skippedShards = 0; // 0 for BWC
String scrollId = null;
List<ShardSearchFailure> failures = new ArrayList<>();
Clusters clusters = Clusters.EMPTY;
while((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
Expand Down Expand Up @@ -296,6 +313,28 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept
parser.skipChildren();
}
}
} else if (Clusters._CLUSTERS_FIELD.match(currentFieldName)) {
int successful = -1;
int total = -1;
int skipped = -1;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (Clusters.SUCCESSFUL_FIELD.match(currentFieldName)) {
successful = parser.intValue();
} else if (Clusters.TOTAL_FIELD.match(currentFieldName)) {
total = parser.intValue();
} else if (Clusters.SKIPPED_FIELD.match(currentFieldName)) {
skipped = parser.intValue();
} else {
parser.skipChildren();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be more strict and just fail if this happens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this parsing method is used in the high-level REST client, we are lenient there to guarantee forward compatibility, meaning that if one day we add a new field under _clusters, we don't break while parsing that but we rather ignore it and everything is fine. This is tested in SearchResponseTests#testFromXContentWithRandomFields

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

}
} else {
parser.skipChildren();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

}
}
clusters = new Clusters(total, successful, skipped);
} else {
parser.skipChildren();
}
Expand All @@ -304,7 +343,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept
SearchResponseSections searchResponseSections = new SearchResponseSections(hits, aggs, suggest, timedOut, terminatedEarly,
profile, numReducePhases);
return new SearchResponse(searchResponseSections, scrollId, totalShards, successfulShards, skippedShards, tookInMillis,
failures.toArray(new ShardSearchFailure[failures.size()]));
failures.toArray(new ShardSearchFailure[failures.size()]), clusters);
}

@Override
Expand All @@ -322,6 +361,12 @@ public void readFrom(StreamInput in) throws IOException {
shardFailures[i] = readShardSearchFailure(in);
}
}
//TODO update version once backported
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
clusters = new Clusters(in);
} else {
clusters = Clusters.EMPTY;
}
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_5_6_0)) {
Expand All @@ -340,7 +385,10 @@ public void writeTo(StreamOutput out) throws IOException {
for (ShardSearchFailure shardSearchFailure : shardFailures) {
shardSearchFailure.writeTo(out);
}

//TODO update version once backported
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
clusters.writeTo(out);
}
out.writeOptionalString(scrollId);
out.writeVLong(tookInMillis);
if(out.getVersion().onOrAfter(Version.V_5_6_0)) {
Expand All @@ -353,4 +401,101 @@ public String toString() {
return Strings.toString(this);
}

/**
* Holds info about the clusters that the search was executed on: how many in total, how many of them were successful
* and how many of them were skipped.
*/
public static class Clusters implements ToXContent, Writeable {

public static final Clusters EMPTY = new Clusters(0, 0, 0);

static final ParseField _CLUSTERS_FIELD = new ParseField("_clusters");
static final ParseField SUCCESSFUL_FIELD = new ParseField("successful");
static final ParseField SKIPPED_FIELD = new ParseField("skipped");
static final ParseField TOTAL_FIELD = new ParseField("total");

private final int total;
private final int successful;
private final int skipped;

Clusters(int total, int successful, int skipped) {
assert total >= 0 && successful >= 0 && skipped >= 0
: "total: " + total + " successful: " + successful + " skipped: " + skipped;
assert successful <= total && skipped == total - successful
: "total: " + total + " successful: " + successful + " skipped: " + skipped;
this.total = total;
this.successful = successful;
this.skipped = skipped;
}

private Clusters(StreamInput in) throws IOException {
this.total = in.readVInt();
this.successful = in.readVInt();
this.skipped = in.readVInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(total);
out.writeVInt(successful);
out.writeVInt(skipped);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (this != EMPTY) {
builder.startObject(_CLUSTERS_FIELD.getPreferredName());
builder.field(TOTAL_FIELD.getPreferredName(), total);
builder.field(SUCCESSFUL_FIELD.getPreferredName(), successful);
builder.field(SKIPPED_FIELD.getPreferredName(), skipped);
builder.endObject();
}
return builder;
}

/**
* Returns how many total clusters the search was requested to be executed on
*/
public int getTotal() {
return total;
}

/**
* Returns how many total clusters the search was executed successfully on
*/
public int getSuccessful() {
return successful;
}

/**
* Returns how many total clusters were during the execution of the search request
*/
public int getSkipped() {
return skipped;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Clusters clusters = (Clusters) o;
return total == clusters.total &&
successful == clusters.successful &&
skipped == clusters.skipped;
}

@Override
public int hashCode() {
return Objects.hash(total, successful, skipped);
}

@Override
public String toString() {
return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ protected final void sendResponse(SearchPhaseController.ReducedQueryPhase queryP
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
0, buildTookInMillis(), buildShardFailures()));
0, buildTookInMillis(), buildShardFailures(), SearchResponse.Clusters.EMPTY));
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures()));
}
Expand Down
Loading