Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML Data Frame] Persist and restore checkpoint and position #41942

Merged
merged 7 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(transformId);
}

/**
* Get the persisted stats document name from the Data Frame Transformer Id.
*
* @return The id of document the where the transform stats are persisted
*/
public static String documentId(String transformId) {
return NAME + "-" + transformId;
}

@Nullable
public String getTransformId() {
return transformId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@

public class DataFrameTransformProgress implements Writeable, ToXContentObject {

private static final ParseField TOTAL_DOCS = new ParseField("total_docs");
private static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
private static final String PERCENT_COMPLETE = "percent_complete";
public static final ParseField TOTAL_DOCS = new ParseField("total_docs");
public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
public static final String PERCENT_COMPLETE = "percent_complete";

public static final ConstructingObjectParser<DataFrameTransformProgress, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_progress",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
@Nullable
private final String reason;

private static final ParseField TASK_STATE = new ParseField("task_state");
private static final ParseField INDEXER_STATE = new ParseField("indexer_state");
private static final ParseField CURRENT_POSITION = new ParseField("current_position");
private static final ParseField CHECKPOINT = new ParseField("checkpoint");
private static final ParseField REASON = new ParseField("reason");
private static final ParseField PROGRESS = new ParseField("progress");
public static final ParseField TASK_STATE = new ParseField("task_state");
public static final ParseField INDEXER_STATE = new ParseField("indexer_state");
public static final ParseField CURRENT_POSITION = new ParseField("current_position");
public static final ParseField CHECKPOINT = new ParseField("checkpoint");
public static final ParseField REASON = new ParseField("reason");
public static final ParseField PROGRESS = new ParseField("progress");

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER = new ConstructingObjectParser<>(NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.dataframe.transforms;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -14,6 +15,7 @@
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.indexing.IndexerState;

Expand All @@ -22,7 +24,7 @@

public class DataFrameTransformStateAndStats implements Writeable, ToXContentObject {

private static final String NAME = "data_frame_transform_state_and_stats";
public static final String NAME = "data_frame_transform_state_and_stats";
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField CHECKPOINTING_INFO_FIELD = new ParseField("checkpointing");

Expand All @@ -47,6 +49,10 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
(p, c) -> DataFrameTransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
}

public static DataFrameTransformStateAndStats fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
return initialStateAndStats(id, new DataFrameIndexerTransformStats(id));
}
Expand All @@ -58,6 +64,15 @@ public static DataFrameTransformStateAndStats initialStateAndStats(String id, Da
DataFrameTransformCheckpointingInfo.EMPTY);
}

/**
* Get the persisted state and stats document name from the Data Frame Transform Id.
*
* @return The id of document the where the transform stats are persisted
*/
public static String documentId(String transformId) {
return NAME + "-" + transformId;
}

public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats,
DataFrameTransformCheckpointingInfo checkpointingInfo) {
this.id = Objects.requireNonNull(id);
Expand All @@ -73,13 +88,21 @@ public DataFrameTransformStateAndStats(StreamInput in) throws IOException {
this.checkpointingInfo = new DataFrameTransformCheckpointingInfo(in);
}

@Nullable
public String getTransformId() {
return transformStats.getTransformId();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DataFrameField.ID.getPreferredName(), id);
builder.field(STATE_FIELD.getPreferredName(), transformState, params);
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), transformStats, params);
builder.field(CHECKPOINTING_INFO_FIELD.getPreferredName(), checkpointingInfo, params);
if (params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe off-topic: what about adding the version here (only if internal storage is true), note: we can do that as separate PR if you prefer.

builder.field(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), NAME);
}
builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -94,16 +95,21 @@ public synchronized IndexerState start() {
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false);
IndexerState currentState = state.updateAndGet(previousState -> {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
onStop();
wasStartedAndSetStopped.set(true);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onStop is now persisting state so it must be called after the state has been updated.

return IndexerState.STOPPED;
} else {
return previousState;
}
});

if (wasStartedAndSetStopped.get()) {
onStop();
}
return currentState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public static void removeIndices() throws Exception {
wipeIndices();
}

public void wipeDataFrameTransforms() throws IOException, InterruptedException {
public void wipeDataFrameTransforms() throws IOException {
List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.Before;

Expand Down Expand Up @@ -72,7 +72,7 @@ public void testUsage() throws Exception {
Request statsExistsRequest = new Request("GET",
DataFrameInternalIndex.INDEX_NAME+"/_search?q=" +
INDEX_DOC_TYPE.getPreferredName() + ":" +
DataFrameIndexerTransformStats.NAME);
DataFrameTransformStateAndStats.NAME);
// Verify that we have our two stats documents
assertBusy(() -> {
Map<String, Object> hasStatsMap = entityAsMap(client().performRequest(statsExistsRequest));
Expand Down Expand Up @@ -100,7 +100,6 @@ public void testUsage() throws Exception {
expectedStats.merge(statName, statistic, Integer::sum);
}


usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));

usageAsMap = entityAsMap(usageResponse);
Expand All @@ -109,7 +108,8 @@ public void testUsage() throws Exception {
assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.started", usageAsMap));
assertEquals(2, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
for(String statName : PROVIDED_STATS) {
assertEquals(expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats."+statName, usageAsMap));
assertEquals("Incorrect stat " + statName,
expectedStats.get(statName), XContentMapValues.extractValue("data_frame.stats." + statName, usageAsMap));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;

Expand Down Expand Up @@ -176,6 +177,7 @@ static DataFrameIndexerTransformStats parseSearchAggs(SearchResponse searchRespo

for(String statName : PROVIDED_STATS) {
Aggregation agg = searchResponse.getAggregations().get(statName);

if (agg instanceof NumericMetricsAggregation.SingleValue) {
statisticsList.add((long)((NumericMetricsAggregation.SingleValue)agg).value());
} else {
Expand All @@ -197,14 +199,15 @@ static DataFrameIndexerTransformStats parseSearchAggs(SearchResponse searchRespo
static void getStatisticSummations(Client client, ActionListener<DataFrameIndexerTransformStats> statsListener) {
QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(),
DataFrameIndexerTransformStats.NAME)));
DataFrameTransformStateAndStats.NAME)));

SearchRequestBuilder requestBuilder = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
.setSize(0)
.setQuery(queryBuilder);

final String path = DataFrameField.STATS_FIELD.getPreferredName() + ".";
for(String statName : PROVIDED_STATS) {
requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(statName));
requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(path + statName));
}

ActionListener<SearchResponse> getStatisticSummationsListener = ActionListener.wrap(
Expand All @@ -213,6 +216,7 @@ static void getStatisticSummations(Client client, ActionListener<DataFrameIndexe
logger.error("statistics summations search returned shard failures: {}",
Arrays.toString(searchResponse.getShardFailures()));
}

statsListener.onResponse(parseSearchAggs(searchResponse));
},
failure -> {
Expand Down
Loading