-
Notifications
You must be signed in to change notification settings - Fork 24.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cross Cluster Search: make remote clusters optional #27182
Changes from all commits
11e485d
2a4bac1
febd3dd
0c53821
1d79d41
38e9281
9c97344
1db4a24
64ab400
abff3e1
82b94e2
1d67334
304f2d0
19c3c84
68f056a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,8 +26,10 @@ | |
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.common.xcontent.StatusToXContentObject; | ||
import org.elasticsearch.common.xcontent.ToXContent; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.common.xcontent.XContentParser; | ||
import org.elasticsearch.rest.RestStatus; | ||
|
@@ -43,6 +45,7 @@ | |
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure; | ||
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; | ||
|
@@ -71,15 +74,18 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb | |
|
||
private ShardSearchFailure[] shardFailures; | ||
|
||
private Clusters clusters; | ||
|
||
private long tookInMillis; | ||
|
||
public SearchResponse() { | ||
} | ||
|
||
public SearchResponse(SearchResponseSections internalResponse, String scrollId, int totalShards, int successfulShards, | ||
int skippedShards, long tookInMillis, ShardSearchFailure[] shardFailures) { | ||
int skippedShards, long tookInMillis, ShardSearchFailure[] shardFailures, Clusters clusters) { | ||
this.internalResponse = internalResponse; | ||
this.scrollId = scrollId; | ||
this.clusters = clusters; | ||
this.totalShards = totalShards; | ||
this.successfulShards = successfulShards; | ||
this.skippedShards = skippedShards; | ||
|
@@ -199,6 +205,15 @@ public Map<String, ProfileShardResult> getProfileResults() { | |
return internalResponse.profile(); | ||
} | ||
|
||
/** | ||
* Returns info about what clusters the search was executed against. Available only in responses obtained | ||
* from a Cross Cluster Search request, otherwise <code>null</code> | ||
* @see Clusters | ||
*/ | ||
public Clusters getClusters() { | ||
return clusters; | ||
} | ||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
builder.startObject(); | ||
|
@@ -221,6 +236,7 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t | |
} | ||
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getSkippedShards(), | ||
getFailedShards(), getShardFailures()); | ||
clusters.toXContent(builder, params); | ||
internalResponse.toXContent(builder, params); | ||
return builder; | ||
} | ||
|
@@ -242,6 +258,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept | |
int skippedShards = 0; // 0 for BWC | ||
String scrollId = null; | ||
List<ShardSearchFailure> failures = new ArrayList<>(); | ||
Clusters clusters = Clusters.EMPTY; | ||
while((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { | ||
if (token == XContentParser.Token.FIELD_NAME) { | ||
currentFieldName = parser.currentName(); | ||
|
@@ -296,6 +313,28 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept | |
parser.skipChildren(); | ||
} | ||
} | ||
} else if (Clusters._CLUSTERS_FIELD.match(currentFieldName)) { | ||
int successful = -1; | ||
int total = -1; | ||
int skipped = -1; | ||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { | ||
if (token == XContentParser.Token.FIELD_NAME) { | ||
currentFieldName = parser.currentName(); | ||
} else if (token.isValue()) { | ||
if (Clusters.SUCCESSFUL_FIELD.match(currentFieldName)) { | ||
successful = parser.intValue(); | ||
} else if (Clusters.TOTAL_FIELD.match(currentFieldName)) { | ||
total = parser.intValue(); | ||
} else if (Clusters.SKIPPED_FIELD.match(currentFieldName)) { | ||
skipped = parser.intValue(); | ||
} else { | ||
parser.skipChildren(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we be more strict and just fail if this happens? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this parsing method is used in the high-level REST client, we are lenient there to guarantee forward compatibility, meaning that if one day we add a new field under _clusters, we don't break while parsing that but we rather ignore it and everything is fine. This is tested in SearchResponseTests#testFromXContentWithRandomFields There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it |
||
} | ||
} else { | ||
parser.skipChildren(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see above |
||
} | ||
} | ||
clusters = new Clusters(total, successful, skipped); | ||
} else { | ||
parser.skipChildren(); | ||
} | ||
|
@@ -304,7 +343,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept | |
SearchResponseSections searchResponseSections = new SearchResponseSections(hits, aggs, suggest, timedOut, terminatedEarly, | ||
profile, numReducePhases); | ||
return new SearchResponse(searchResponseSections, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, | ||
failures.toArray(new ShardSearchFailure[failures.size()])); | ||
failures.toArray(new ShardSearchFailure[failures.size()]), clusters); | ||
} | ||
|
||
@Override | ||
|
@@ -322,6 +361,12 @@ public void readFrom(StreamInput in) throws IOException { | |
shardFailures[i] = readShardSearchFailure(in); | ||
} | ||
} | ||
//TODO update version once backported | ||
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { | ||
clusters = new Clusters(in); | ||
} else { | ||
clusters = Clusters.EMPTY; | ||
} | ||
scrollId = in.readOptionalString(); | ||
tookInMillis = in.readVLong(); | ||
if (in.getVersion().onOrAfter(Version.V_5_6_0)) { | ||
|
@@ -340,7 +385,10 @@ public void writeTo(StreamOutput out) throws IOException { | |
for (ShardSearchFailure shardSearchFailure : shardFailures) { | ||
shardSearchFailure.writeTo(out); | ||
} | ||
|
||
//TODO update version once backported | ||
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { | ||
clusters.writeTo(out); | ||
} | ||
out.writeOptionalString(scrollId); | ||
out.writeVLong(tookInMillis); | ||
if(out.getVersion().onOrAfter(Version.V_5_6_0)) { | ||
|
@@ -353,4 +401,101 @@ public String toString() { | |
return Strings.toString(this); | ||
} | ||
|
||
/** | ||
* Holds info about the clusters that the search was executed on: how many in total, how many of them were successful | ||
* and how many of them were skipped. | ||
*/ | ||
public static class Clusters implements ToXContent, Writeable { | ||
|
||
public static final Clusters EMPTY = new Clusters(0, 0, 0); | ||
|
||
static final ParseField _CLUSTERS_FIELD = new ParseField("_clusters"); | ||
static final ParseField SUCCESSFUL_FIELD = new ParseField("successful"); | ||
static final ParseField SKIPPED_FIELD = new ParseField("skipped"); | ||
static final ParseField TOTAL_FIELD = new ParseField("total"); | ||
|
||
private final int total; | ||
private final int successful; | ||
private final int skipped; | ||
|
||
Clusters(int total, int successful, int skipped) { | ||
assert total >= 0 && successful >= 0 && skipped >= 0 | ||
: "total: " + total + " successful: " + successful + " skipped: " + skipped; | ||
assert successful <= total && skipped == total - successful | ||
: "total: " + total + " successful: " + successful + " skipped: " + skipped; | ||
this.total = total; | ||
this.successful = successful; | ||
this.skipped = skipped; | ||
} | ||
|
||
private Clusters(StreamInput in) throws IOException { | ||
this.total = in.readVInt(); | ||
this.successful = in.readVInt(); | ||
this.skipped = in.readVInt(); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeVInt(total); | ||
out.writeVInt(successful); | ||
out.writeVInt(skipped); | ||
} | ||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
if (this != EMPTY) { | ||
builder.startObject(_CLUSTERS_FIELD.getPreferredName()); | ||
builder.field(TOTAL_FIELD.getPreferredName(), total); | ||
builder.field(SUCCESSFUL_FIELD.getPreferredName(), successful); | ||
builder.field(SKIPPED_FIELD.getPreferredName(), skipped); | ||
builder.endObject(); | ||
} | ||
return builder; | ||
} | ||
|
||
/** | ||
* Returns how many total clusters the search was requested to be executed on | ||
*/ | ||
public int getTotal() { | ||
return total; | ||
} | ||
|
||
/** | ||
* Returns how many total clusters the search was executed successfully on | ||
*/ | ||
public int getSuccessful() { | ||
return successful; | ||
} | ||
|
||
/** | ||
* Returns how many total clusters were during the execution of the search request | ||
*/ | ||
public int getSkipped() { | ||
return skipped; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
Clusters clusters = (Clusters) o; | ||
return total == clusters.total && | ||
successful == clusters.successful && | ||
skipped == clusters.skipped; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(total, successful, skipped); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}'; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid not, SearchResponse implements Streamable and not Writeable as it is an ActionResponse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough