Skip to content

Commit

Permalink
[7.x][ML] Delete unused data frame analytics state (elastic#50243)
Browse files Browse the repository at this point in the history
This commit adds removal of unused data frame analytics state
from the _delete_expired_data API (and in extend th ML daily
maintenance task). At the moment the potential state docs
include the progress document and state for regression and
classification analyses.

Backport of elastic#50243
  • Loading branch information
dimitris-athanasiou committed Dec 17, 2019
1 parent 726c35d commit 5c1dc6b
Show file tree
Hide file tree
Showing 25 changed files with 235 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.ml.dataframe;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -310,6 +311,15 @@ public static String documentId(String id) {
return TYPE + "-" + id;
}

/**
* Returns the job id from the doc id. Returns {@code null} if the doc id is invalid.
*/
@Nullable
public static String extractJobIdFromDocId(String docId) {
String jobId = docId.replaceAll("^" + TYPE +"-", "");
return jobId.equals(docId) ? null : jobId;
}

public static class Builder {

private String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class Classification implements DataFrameAnalysis {
public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");

private static final String STATE_DOC_ID_SUFFIX = "_classification_state#1";

private static final ConstructingObjectParser<Classification, Void> LENIENT_PARSER = createParser(true);
private static final ConstructingObjectParser<Classification, Void> STRICT_PARSER = createParser(false);

Expand Down Expand Up @@ -258,7 +260,12 @@ public boolean persistsState() {

@Override
public String getStateDocId(String jobId) {
return jobId + "_classification_state#1";
return jobId + STATE_DOC_ID_SUFFIX;
}

public static String extractJobIdFromStateDoc(String stateDocId) {
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class Regression implements DataFrameAnalysis {
public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");

private static final String STATE_DOC_ID_SUFFIX = "_regression_state#1";

private static final ConstructingObjectParser<Regression, Void> LENIENT_PARSER = createParser(true);
private static final ConstructingObjectParser<Regression, Void> STRICT_PARSER = createParser(false);

Expand Down Expand Up @@ -196,7 +198,12 @@ public boolean persistsState() {

@Override
public String getStateDocId(String jobId) {
return jobId + "_regression_state#1";
return jobId + STATE_DOC_ID_SUFFIX;
}

public static String extractJobIdFromStateDoc(String stateDocId) {
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public class DataFrameAnalyticsConfigTests extends AbstractSerializingTestCase<DataFrameAnalyticsConfig> {
Expand Down Expand Up @@ -384,6 +385,13 @@ public void testToXContent_GivenAnalysisWithRandomizeSeedAndVersionIsBeforeItWas
}
}

public void testExtractJobIdFromDocId() {
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-foo"), equalTo("foo"));
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-data_frame_analytics_config-foo"),
equalTo("data_frame_analytics_config-foo"));
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue()));
}

private static void assertTooSmall(ElasticsearchStatusException e) {
assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,9 @@ public void testGetStateDocId() {
String randomId = randomAlphaOfLength(10);
assertThat(classification.getStateDocId(randomId), equalTo(randomId + "_classification_state#1"));
}

public void testExtractJobIdFromStateDoc() {
assertThat(Classification.extractJobIdFromStateDoc("foo_bar-1_classification_state#1"), equalTo("foo_bar-1"));
assertThat(Classification.extractJobIdFromStateDoc("noop"), is(nullValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public void testGetStateDocId() {
assertThat(regression.getStateDocId(randomId), equalTo(randomId + "_regression_state#1"));
}

public void testExtractJobIdFromStateDoc() {
assertThat(Regression.extractJobIdFromStateDoc("foo_bar-1_regression_state#1"), equalTo("foo_bar-1"));
assertThat(Regression.extractJobIdFromStateDoc("noop"), is(nullValue()));
}

public void testToXContent_GivenVersionBeforeRandomizeSeedWasIntroduced() throws IOException {
Regression regression = createRandom();
assertThat(regression.getRandomizeSeed(), is(notNullValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
Expand Down Expand Up @@ -314,6 +316,38 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
}

public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
initialize("classification_delete_expired_data");
indexData(sourceIndex, 100, 0, KEYWORD_FIELD);

DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);

assertProgress(jobId, 100, 100, 100, 100);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
assertInferenceModelPersisted(jobId);

// Call _delete_expired_data API and check nothing was deleted
assertThat(deleteExpiredData().isDeleted(), is(true));
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());

// Delete the config straight from the config index
DeleteResponse deleteResponse = client().prepareDelete(".ml-config", DataFrameAnalyticsConfig.documentId(jobId))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
assertThat(deleteResponse.status(), equalTo(RestStatus.OK));

// Now calling the _delete_expired_data API should remove unused state
assertThat(deleteExpiredData().isDeleted(), is(true));

SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
}

private void initialize(String jobId) {
this.jobId = jobId;
this.sourceIndex = jobId + "_source_index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void tearDownData() {
cleanUp();
}

public void testDeleteExpiredDataGivenNothingToDelete() throws Exception {
public void testDeleteExpiredData_GivenNothingToDelete() throws Exception {
// Tests that nothing goes wrong when there's nothing to delete
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
}
Expand Down Expand Up @@ -202,10 +202,7 @@ public void testDeleteExpiredData() throws Exception {
assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK));

// Now call the action under test
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();

// We need to refresh to ensure the deletion is visible
client().admin().indices().prepareRefresh("*").get();
assertThat(deleteExpiredData().isDeleted(), is(true));

// no-retention job should have kept all data
assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
Expand All @@ -45,7 +46,6 @@
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;

Expand Down Expand Up @@ -205,7 +205,7 @@ protected void assertProgress(String id, int reindexing, int loadingData, int an
}

protected SearchResponse searchStoredProgress(String jobId) {
String docId = DataFrameAnalyticsTask.progressDocId(jobId);
String docId = StoredProgress.documentId(jobId);
return client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
.setQuery(QueryBuilders.idsQuery().addIds(docId))
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
Expand Down Expand Up @@ -113,6 +114,16 @@ private void waitForPendingTasks() {
}
}

protected DeleteExpiredDataAction.Response deleteExpiredData() throws Exception {
DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE,
new DeleteExpiredDataAction.Request()).get();

// We need to refresh to ensure the deletion is visible
client().admin().indices().prepareRefresh("*").get();

return response;
}

@Override
protected void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
Expand Down Expand Up @@ -272,6 +274,38 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
}

public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
initialize("regression_delete_expired_data");
indexData(sourceIndex, 100, 0);

DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);

assertProgress(jobId, 100, 100, 100, 100);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
assertInferenceModelPersisted(jobId);

// Call _delete_expired_data API and check nothing was deleted
assertThat(deleteExpiredData().isDeleted(), is(true));
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());

// Delete the config straight from the config index
DeleteResponse deleteResponse = client().prepareDelete(".ml-config", DataFrameAnalyticsConfig.documentId(jobId))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
assertThat(deleteResponse.status(), equalTo(RestStatus.OK));

// Now calling the _delete_expired_data API should remove unused state
assertThat(deleteExpiredData().isDeleted(), is(true));

SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
}

private void initialize(String jobId) {
this.jobId = jobId;
this.sourceIndex = jobId + "_source_index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
Expand Down Expand Up @@ -171,7 +171,7 @@ private void deleteState(ParentTaskAssigningClient parentTaskClient,
DataFrameAnalyticsConfig config,
ActionListener<BulkByScrollResponse> listener) {
List<String> ids = new ArrayList<>();
ids.add(DataFrameAnalyticsTask.progressDocId(config.getId()));
ids.add(StoredProgress.documentId(config.getId()));
if (config.getAnalysis().persistsState()) {
ids.add(config.getAnalysis().getStateDocId(config.getId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private void searchStoredProgresses(List<String> configIds, ActionListener<List<
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
searchRequest.source().size(1);
searchRequest.source().query(QueryBuilders.idsQuery().addIds(DataFrameAnalyticsTask.progressDocId(configId)));
searchRequest.source().query(QueryBuilders.idsQuery().addIds(StoredProgress.documentId(configId)));
multiSearchRequest.add(searchRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private void persistProgress(Runnable runnable) {
statsResponse -> {
GetDataFrameAnalyticsStatsAction.Response.Stats stats = statsResponse.getResponse().results().get(0);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
indexRequest.id(progressDocId(taskParams.getId()));
indexRequest.id(StoredProgress.documentId(taskParams.getId()));
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
new StoredProgress(stats.getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
Expand Down Expand Up @@ -310,10 +310,6 @@ public static StartingState determineStartingState(String jobId, List<PhaseProgr
}
}

public static String progressDocId(String id) {
return "data_frame_analytics-" + id + "-progress";
}

public static class ProgressTracker {

public static final String REINDEXING = "reindexing";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.dataframe;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
Expand All @@ -14,6 +15,8 @@
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class StoredProgress implements ToXContentObject {

Expand Down Expand Up @@ -57,4 +60,15 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(progress);
}

public static String documentId(String id) {
return "data_frame_analytics-" + id + "-progress";
}

@Nullable
public static String extractJobIdFromDocId(String docId) {
Pattern pattern = Pattern.compile("^data_frame_analytics-(.*)-progress$");
Matcher matcher = pattern.matcher(docId);
return matcher.find() ? matcher.group(1) : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;

import java.io.IOException;
import java.io.InputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;

public abstract class BatchedResultsIterator<T> extends BatchedDocumentsIterator<Result<T>> {

Expand Down
Loading

0 comments on commit 5c1dc6b

Please sign in to comment.