-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HLRC: reindex API with wait_for_completion false #35202
Changes from 7 commits
ec3c93d
3cc288d
0aa1294
686f10a
16f2550
d3ca238
ef40220
ebf0f53
5df4cbb
f66f7b0
acc66f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.client.tasks; | ||
|
||
import org.elasticsearch.action.ActionResponse; | ||
import org.elasticsearch.common.ParseField; | ||
import org.elasticsearch.common.xcontent.ConstructingObjectParser; | ||
import org.elasticsearch.common.xcontent.ObjectParser; | ||
import org.elasticsearch.common.xcontent.XContentParser; | ||
import org.elasticsearch.tasks.TaskId; | ||
|
||
import java.io.IOException; | ||
import java.util.Objects; | ||
|
||
public class TaskSubmissionResponse extends ActionResponse { | ||
private static final ParseField TASK = new ParseField("task"); | ||
public static final ConstructingObjectParser<TaskSubmissionResponse, Void> PARSER = new ConstructingObjectParser<>( | ||
"task_submission_response", | ||
true, a -> new TaskSubmissionResponse((TaskId) a[0])); | ||
|
||
static { | ||
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); | ||
} | ||
|
||
public static TaskSubmissionResponse fromXContent(XContentParser parser) throws IOException { | ||
return PARSER.parse(parser, null); | ||
} | ||
|
||
private final TaskId task; | ||
|
||
TaskSubmissionResponse(TaskId task) { | ||
this.task = task; | ||
} | ||
|
||
/** | ||
* Get the task id | ||
* | ||
* @return the id of the reindex task. | ||
*/ | ||
public TaskId getTask() { | ||
return task; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(task); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object other) { | ||
if (this == other) { | ||
return true; | ||
} | ||
if (other == null || getClass() != other.getClass()) { | ||
return false; | ||
} | ||
TaskSubmissionResponse that = (TaskSubmissionResponse) other; | ||
return Objects.equals(task, that.task); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.client; | ||
|
||
import org.elasticsearch.action.bulk.BulkRequest; | ||
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.action.support.WriteRequest; | ||
import org.elasticsearch.client.tasks.TaskSubmissionResponse; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.xcontent.XContentType; | ||
import org.elasticsearch.index.query.IdsQueryBuilder; | ||
import org.elasticsearch.index.reindex.BulkByScrollResponse; | ||
import org.elasticsearch.index.reindex.ReindexRequest; | ||
import org.elasticsearch.rest.RestStatus; | ||
import org.elasticsearch.tasks.TaskId; | ||
|
||
import java.io.IOException; | ||
import java.util.Collections; | ||
import java.util.function.BooleanSupplier; | ||
|
||
public class ReindexIT extends ESRestHighLevelClientTestCase { | ||
|
||
public void testReindex() throws IOException { | ||
final String sourceIndex = "source1"; | ||
final String destinationIndex = "dest"; | ||
{ | ||
// Prepare | ||
Settings settings = Settings.builder() | ||
.put("number_of_shards", 1) | ||
.put("number_of_replicas", 0) | ||
.build(); | ||
createIndex(sourceIndex, settings); | ||
createIndex(destinationIndex, settings); | ||
BulkRequest bulkRequest = new BulkRequest() | ||
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) | ||
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); | ||
assertEquals( | ||
RestStatus.OK, | ||
highLevelClient().bulk( | ||
bulkRequest, | ||
RequestOptions.DEFAULT | ||
).status() | ||
); | ||
} | ||
{ | ||
// test1: create one doc in dest | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does this comment mean? i see it in a few places in this file There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will rename this - I guess someone meant to reindex one document with Id 1 from source to destination |
||
ReindexRequest reindexRequest = new ReindexRequest(); | ||
reindexRequest.setSourceIndices(sourceIndex); | ||
reindexRequest.setDestIndex(destinationIndex); | ||
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); | ||
reindexRequest.setRefresh(true); | ||
BulkByScrollResponse bulkResponse = execute(reindexRequest, highLevelClient()::reindex, highLevelClient()::reindexAsync); | ||
assertEquals(1, bulkResponse.getCreated()); | ||
assertEquals(1, bulkResponse.getTotal()); | ||
assertEquals(0, bulkResponse.getDeleted()); | ||
assertEquals(0, bulkResponse.getNoops()); | ||
assertEquals(0, bulkResponse.getVersionConflicts()); | ||
assertEquals(1, bulkResponse.getBatches()); | ||
assertTrue(bulkResponse.getTook().getMillis() > 0); | ||
assertEquals(1, bulkResponse.getBatches()); | ||
assertEquals(0, bulkResponse.getBulkFailures().size()); | ||
assertEquals(0, bulkResponse.getSearchFailures().size()); | ||
} | ||
} | ||
|
||
public void testReindexTask() throws IOException, InterruptedException { | ||
final String sourceIndex = "source123"; | ||
final String destinationIndex = "dest2"; | ||
{ | ||
// Prepare | ||
Settings settings = Settings.builder() | ||
.put("number_of_shards", 1) | ||
.put("number_of_replicas", 0) | ||
.build(); | ||
createIndex(sourceIndex, settings); | ||
createIndex(destinationIndex, settings); | ||
BulkRequest bulkRequest = new BulkRequest() | ||
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON)) | ||
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON)) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); | ||
assertEquals( | ||
RestStatus.OK, | ||
highLevelClient().bulk( | ||
bulkRequest, | ||
RequestOptions.DEFAULT | ||
).status() | ||
); | ||
} | ||
{ | ||
// test1: create one doc in dest | ||
ReindexRequest reindexRequest = new ReindexRequest(); | ||
reindexRequest.setSourceIndices(sourceIndex); | ||
reindexRequest.setDestIndex(destinationIndex); | ||
reindexRequest.setSourceQuery(new IdsQueryBuilder().addIds("1").types("type")); | ||
reindexRequest.setRefresh(true); | ||
|
||
TaskSubmissionResponse reindexSubmission = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT); | ||
|
||
BooleanSupplier hasUpgradeCompleted = checkCompletionStatus(reindexSubmission.getTask()); | ||
awaitBusy(hasUpgradeCompleted); | ||
} | ||
} | ||
|
||
private BooleanSupplier checkCompletionStatus(TaskId taskId) { | ||
return () -> { | ||
try { | ||
Response response = client().performRequest(new Request("GET", "/_tasks/" + taskId.toString())); | ||
return (boolean) entityAsMap(response).get("completed"); | ||
} catch (IOException e) { | ||
fail(e.getMessage()); | ||
return false; | ||
} | ||
}; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there's any value to returning a ReindexSubmissionResponse rather than just a TaskId here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe as TaskId is good here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was considering this, but the reason I stayed with a Wrapping object is that Rest API returns taskId wrapped inside an object where TaskID is just one field. Same is in DeleteJobResponse (ML).
However I don't mind breaking this convention. It would however look a bit odd inside TaskID to have a parser that treats taskId as a field within itself. See TaskID from latest revision where I did that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmmmmmm. I see. So the idea is that if we added more than as
TaskId
to the response then we could stick it on this object. We can do that in the REST response because of the way the JSON is structured. The way your have it we could do that here. I like that argument. Even though it looks like an extra argument, it makes sense. It might be worth sticking on the Javadoc of that object. Somewhere.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im +1 for a response object matching the response name (like ReindexSubmissionResponse) for this "adding things to the API" reason as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of staying with TaskSubmissionResponse with String taskId field. As this would fit into other wait_for_completion=false type methods as well.
We could in theory always make the taskSubmission abstract and return concrete responses - with all the parsing and fields being already in the abstract class. That would be a bit too complex for that I guess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with
TaskSubmissionResponse
. And I'm fine withString taskId
. I'd also be fine with a client-sideTaskId
class that is super opaque. It is an identifier that client's aren't meant to pick apart. AString
is fine for that though.