Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML][Transforms] add wait_for_checkpoint flag to stop #47935

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.elasticsearch.client.transform.DeleteTransformRequest.FORCE;
import static org.elasticsearch.client.transform.GetTransformRequest.ALLOW_NO_MATCH;
import static org.elasticsearch.client.transform.PutTransformRequest.DEFER_VALIDATION;
import static org.elasticsearch.client.transform.StopTransformRequest.WAIT_FOR_CHECKPOINT;

final class TransformRequestConverters {

Expand Down Expand Up @@ -135,6 +136,9 @@ static Request stopTransform(StopTransformRequest stopRequest) {
if (stopRequest.getAllowNoMatch() != null) {
request.addParameter(ALLOW_NO_MATCH, stopRequest.getAllowNoMatch().toString());
}
if (stopRequest.getWaitForCheckpoint() != null) {
request.addParameter(WAIT_FOR_CHECKPOINT, stopRequest.getWaitForCheckpoint().toString());
}
request.addParameters(params.asMap());
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@

public class StopTransformRequest implements Validatable {

public static final String WAIT_FOR_CHECKPOINT = "wait_for_checkpoint";

private final String id;
private Boolean waitForCompletion;
private Boolean waitForCheckpoint;
private TimeValue timeout;
private Boolean allowNoMatch;

public StopTransformRequest(String id) {
this.id = id;
waitForCompletion = null;
timeout = null;
this(id, null, null, null);
}

public StopTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout) {
public StopTransformRequest(String id, Boolean waitForCompletion, TimeValue timeout, Boolean waitForCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly I see 1 potential problem:

  1. call _stop?wait_for_completion=true&wait_for_checkpoint=true

this lets the call block

  1. call _stop?wait_for_checkpoint=false

that's fine, you should be able to switch wait_for_checkpoint by calling _stop again, however: the call to 1 will return after this call and there is no indication that we did not stop at a checkpoint. I think we should add a field to the response object noting whether the api has stopped at a checkpoint or not.

(In that respect I also wonder if it should be possible to revert the decission of wait_for_checkpoint by a call to _start)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would also be in line with a general improvement of Response objects (probably better placed in a separate PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hendrikmuhs I agree on indicating if the stop was indeed at a checkpoint or not. That information would be available via a _stats call, but I suppose we can add something here.

As for negating with a call to _start, I do not want to complicate these interactions any further. I am against it. If somebody wants to start it again, they should call _stop?wait_for_checkpoint=false and then _start again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have to think about being able to alert the user on if the _stop indeed caused the transform to stop at a checkpoint or not.

when wait_for_completion is spinning, it is checking on every cluster state update. Since we don't store this state information inside cluster state, there is nothing indicating if the value changed or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hendrikmuhs after digging around, I don't think this is possible. There are no hooks into how the task is cleared and we consider it "stopped" when it is deleted, consequently losing insight into how it was stopped.

this.id = id;
this.waitForCompletion = waitForCompletion;
this.timeout = timeout;
this.waitForCheckpoint = waitForCheckpoint;
}

public String getId() {
Expand Down Expand Up @@ -73,6 +75,14 @@ public void setAllowNoMatch(Boolean allowNoMatch) {
this.allowNoMatch = allowNoMatch;
}

public Boolean getWaitForCheckpoint() {
return waitForCheckpoint;
}

public void setWaitForCheckpoint(Boolean waitForCheckpoint) {
this.waitForCheckpoint = waitForCheckpoint;
}

@Override
public Optional<ValidationException> validate() {
if (id == null) {
Expand All @@ -86,7 +96,7 @@ public Optional<ValidationException> validate() {

@Override
public int hashCode() {
return Objects.hash(id, waitForCompletion, timeout, allowNoMatch);
return Objects.hash(id, waitForCompletion, timeout, allowNoMatch, waitForCheckpoint);
}

@Override
Expand All @@ -102,6 +112,7 @@ public boolean equals(Object obj) {
return Objects.equals(this.id, other.id)
&& Objects.equals(this.waitForCompletion, other.waitForCompletion)
&& Objects.equals(this.timeout, other.timeout)
&& Objects.equals(this.waitForCheckpoint, other.waitForCheckpoint)
&& Objects.equals(this.allowNoMatch, other.allowNoMatch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void indexData(String indexName) throws IOException {
public void cleanUpTransforms() throws Exception {
for (String transformId : transformsToClean) {
highLevelClient().transform().stopTransform(
new StopTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT);
new StopTransformRequest(transformId, Boolean.TRUE, null, false), RequestOptions.DEFAULT);
}

for (String transformId : transformsToClean) {
Expand Down Expand Up @@ -310,7 +310,7 @@ public void testStartStop() throws IOException {
assertThat(taskState, oneOf(TransformStats.State.STARTED, TransformStats.State.INDEXING,
TransformStats.State.STOPPING, TransformStats.State.STOPPED));

StopTransformRequest stopRequest = new StopTransformRequest(id, Boolean.TRUE, null);
StopTransformRequest stopRequest = new StopTransformRequest(id, Boolean.TRUE, null, false);
StopTransformResponse stopResponse =
execute(stopRequest, client::stopTransform, client::stopTransformAsync);
assertTrue(stopResponse.isAcknowledged());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ public void testStopDataFrameTransform() {
if (randomBoolean()) {
timeValue = TimeValue.parseTimeValue(randomTimeValue(), "timeout");
}
StopTransformRequest stopRequest = new StopTransformRequest(id, waitForCompletion, timeValue);
Boolean waitForCheckpoint = null;
if (randomBoolean()) {
waitForCheckpoint = randomBoolean();
}

StopTransformRequest stopRequest = new StopTransformRequest(id, waitForCompletion, timeValue, waitForCheckpoint);

Request request = TransformRequestConverters.stopTransform(stopRequest);
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
Expand All @@ -168,6 +173,12 @@ public void testStopDataFrameTransform() {
} else {
assertFalse(request.getParameters().containsKey("timeout"));
}
if (waitForCheckpoint != null) {
assertTrue(request.getParameters().containsKey("wait_for_checkpoint"));
assertEquals(stopRequest.getWaitForCheckpoint(), Boolean.parseBoolean(request.getParameters().get("wait_for_checkpoint")));
} else {
assertFalse(request.getParameters().containsKey("wait_for_checkpoint"));
}

assertFalse(request.getParameters().containsKey(ALLOW_NO_MATCH));
stopRequest.setAllowNoMatch(randomBoolean());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
public void cleanUpTransforms() throws Exception {
for (String transformId : transformsToClean) {
highLevelClient().transform().stopTransform(
new StopTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT);
new StopTransformRequest(transformId, true, TimeValue.timeValueSeconds(20), false), RequestOptions.DEFAULT);
}

for (String transformId : transformsToClean) {
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/transform/apis/stop-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ are no matches or only partial matches.
state completely stops. If set to `false`, the API returns immediately and the
indexer will be stopped asynchronously in the background. Defaults to `false`.

`wait_for_checkpoint`::
(Optional, boolean) If set to `true`, the transform will not completely stop
until the current checkpoint is completed. If set to `false`, the transform
stops as soon as possible. Defaults to `false`.

[[stop-transform-response-codes]]
==== {api-response-codes-title}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public final class TransformField {
public static final ParseField GROUP_BY = new ParseField("group_by");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
public static final ParseField WAIT_FOR_CHECKPOINT = new ParseField("wait_for_checkpoint");
public static final ParseField STATS_FIELD = new ParseField("stats");
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
public static final ParseField SOURCE = new ParseField("source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,23 @@ public static class Request extends BaseTasksRequest<Request> {
private final boolean waitForCompletion;
private final boolean force;
private final boolean allowNoMatch;
private final boolean waitForCheckpoint;
private Set<String> expandedIds;

public Request(String id, boolean waitForCompletion, boolean force, @Nullable TimeValue timeout, boolean allowNoMatch) {
public Request(String id,
boolean waitForCompletion,
boolean force,
@Nullable TimeValue timeout,
boolean allowNoMatch,
boolean waitForCheckpoint) {
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
this.waitForCompletion = waitForCompletion;
this.force = force;

// use the timeout value already present in BaseTasksRequest
this.setTimeout(timeout == null ? DEFAULT_TIMEOUT : timeout);
this.allowNoMatch = allowNoMatch;
this.waitForCheckpoint = waitForCheckpoint;
}

public Request(StreamInput in) throws IOException {
Expand All @@ -73,6 +80,11 @@ public Request(StreamInput in) throws IOException {
} else {
this.allowNoMatch = true;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.waitForCheckpoint = in.readBoolean();
} else {
this.waitForCheckpoint = false;
}
}

public String getId() {
Expand All @@ -99,6 +111,10 @@ public boolean isAllowNoMatch() {
return allowNoMatch;
}

public boolean isWaitForCheckpoint() {
return waitForCheckpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand All @@ -113,6 +129,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeBoolean(allowNoMatch);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(waitForCheckpoint);
}
}

@Override
Expand All @@ -123,7 +142,7 @@ public ActionRequestValidationException validate() {
@Override
public int hashCode() {
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch);
return Objects.hash(id, waitForCompletion, force, expandedIds, this.getTimeout(), allowNoMatch, waitForCheckpoint);
}

@Override
Expand All @@ -146,6 +165,7 @@ public boolean equals(Object obj) {
Objects.equals(waitForCompletion, other.waitForCompletion) &&
Objects.equals(force, other.force) &&
Objects.equals(expandedIds, other.expandedIds) &&
Objects.equals(waitForCheckpoint, other.waitForCheckpoint) &&
allowNoMatch == other.allowNoMatch;
}

Expand Down
Loading