Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Delete unused data frame analytics state #50243

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.ml.dataframe;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -310,6 +311,15 @@ public static String documentId(String id) {
return TYPE + "-" + id;
}

/**
* Returns the job id from the doc id. Returns {@code null} if the doc id is invalid.
*/
@Nullable
public static String extractJobIdFromDocId(String docId) {
String jobId = docId.replaceAll("^" + TYPE +"-", "");
return jobId.equals(docId) ? null : jobId;
}

public static class Builder {

private String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class Classification implements DataFrameAnalysis {
public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");

private static final String STATE_DOC_ID_SUFFIX = "_classification_state#1";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the suffix have the #1 attached, it feels like thats a number that will change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought about this. We currently handle just a single doc. But we can fix this if and when we switch to handling state split over multiple docs.


private static final ConstructingObjectParser<Classification, Void> LENIENT_PARSER = createParser(true);
private static final ConstructingObjectParser<Classification, Void> STRICT_PARSER = createParser(false);

Expand Down Expand Up @@ -256,7 +258,12 @@ public boolean persistsState() {

@Override
public String getStateDocId(String jobId) {
return jobId + "_classification_state#1";
return jobId + STATE_DOC_ID_SUFFIX;
}

public static String extractJobIdFromStateDoc(String stateDocId) {
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class Regression implements DataFrameAnalysis {
public static final ParseField TRAINING_PERCENT = new ParseField("training_percent");
public static final ParseField RANDOMIZE_SEED = new ParseField("randomize_seed");

private static final String STATE_DOC_ID_SUFFIX = "_regression_state#1";

private static final ConstructingObjectParser<Regression, Void> LENIENT_PARSER = createParser(true);
private static final ConstructingObjectParser<Regression, Void> STRICT_PARSER = createParser(false);

Expand Down Expand Up @@ -196,7 +198,12 @@ public boolean persistsState() {

@Override
public String getStateDocId(String jobId) {
return jobId + "_regression_state#1";
return jobId + STATE_DOC_ID_SUFFIX;
}

public static String extractJobIdFromStateDoc(String stateDocId) {
int suffixIndex = stateDocId.lastIndexOf(STATE_DOC_ID_SUFFIX);
return suffixIndex <= 0 ? null : stateDocId.substring(0, suffixIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public class DataFrameAnalyticsConfigTests extends AbstractSerializingTestCase<DataFrameAnalyticsConfig> {
Expand Down Expand Up @@ -384,6 +385,13 @@ public void testToXContent_GivenAnalysisWithRandomizeSeedAndVersionIsBeforeItWas
}
}

public void testExtractJobIdFromDocId() {
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-foo"), equalTo("foo"));
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("data_frame_analytics_config-data_frame_analytics_config-foo"),
equalTo("data_frame_analytics_config-foo"));
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue()));
}

private static void assertTooSmall(ElasticsearchStatusException e) {
assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,9 @@ public void testGetStateDocId() {
String randomId = randomAlphaOfLength(10);
assertThat(classification.getStateDocId(randomId), equalTo(randomId + "_classification_state#1"));
}

public void testExtractJobIdFromStateDoc() {
assertThat(Classification.extractJobIdFromStateDoc("foo_bar-1_classification_state#1"), equalTo("foo_bar-1"));
assertThat(Classification.extractJobIdFromStateDoc("noop"), is(nullValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public void testGetStateDocId() {
assertThat(regression.getStateDocId(randomId), equalTo(randomId + "_regression_state#1"));
}

public void testExtractJobIdFromStateDoc() {
assertThat(Regression.extractJobIdFromStateDoc("foo_bar-1_regression_state#1"), equalTo("foo_bar-1"));
assertThat(Regression.extractJobIdFromStateDoc("noop"), is(nullValue()));
}

public void testToXContent_GivenVersionBeforeRandomizeSeedWasIntroduced() throws IOException {
Regression regression = createRandom();
assertThat(regression.getRandomizeSeed(), is(notNullValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
Expand Down Expand Up @@ -315,6 +317,38 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
}

public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
initialize("classification_delete_expired_data");
indexData(sourceIndex, 100, 0, KEYWORD_FIELD);

DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Classification(KEYWORD_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);

assertProgress(jobId, 100, 100, 100, 100);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
assertInferenceModelPersisted(jobId);

// Call _delete_expired_data API and check nothing was deleted
assertThat(deleteExpiredData().isDeleted(), is(true));
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());

// Delete the config straight from the config index
DeleteResponse deleteResponse = client().prepareDelete(".ml-config", DataFrameAnalyticsConfig.documentId(jobId))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete the config? Does DeleteExpiredDataAction require the config to be deleted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for the state to become "unused", ie. there is no job owning those state docs.

.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
assertThat(deleteResponse.status(), equalTo(RestStatus.OK));

// Now calling the _delete_expired_data API should remove unused state
assertThat(deleteExpiredData().isDeleted(), is(true));

SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
}

private void initialize(String jobId) {
this.jobId = jobId;
this.sourceIndex = jobId + "_source_index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void tearDownData() {
cleanUp();
}

public void testDeleteExpiredDataGivenNothingToDelete() throws Exception {
public void testDeleteExpiredData_GivenNothingToDelete() throws Exception {
// Tests that nothing goes wrong when there's nothing to delete
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
}
Expand Down Expand Up @@ -201,10 +201,7 @@ public void testDeleteExpiredData() throws Exception {
assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK));

// Now call the action under test
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();

// We need to refresh to ensure the deletion is visible
client().admin().indices().prepareRefresh("*").get();
assertThat(deleteExpiredData().isDeleted(), is(true));

// no-retention job should have kept all data
assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
Expand All @@ -45,7 +46,6 @@
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;

Expand Down Expand Up @@ -205,7 +205,7 @@ protected void assertProgress(String id, int reindexing, int loadingData, int an
}

protected SearchResponse searchStoredProgress(String jobId) {
String docId = DataFrameAnalyticsTask.progressDocId(jobId);
String docId = StoredProgress.documentId(jobId);
return client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
.setQuery(QueryBuilders.idsQuery().addIds(docId))
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
Expand Down Expand Up @@ -142,6 +143,16 @@ private void waitForPendingTasks() {
}
}

protected DeleteExpiredDataAction.Response deleteExpiredData() throws Exception {
DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE,
new DeleteExpiredDataAction.Request()).get();

// We need to refresh to ensure the deletion is visible
client().admin().indices().prepareRefresh("*").get();

return response;
}

@Override
protected void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
Expand Down Expand Up @@ -272,6 +274,38 @@ public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exceptio
assertThat(secondRunTrainingRowsIds, equalTo(firstRunTrainingRowsIds));
}

public void testDeleteExpiredData_RemovesUnusedState() throws Exception {
initialize("regression_delete_expired_data");
indexData(sourceIndex, 100, 0);

DataFrameAnalyticsConfig config = buildAnalytics(jobId, sourceIndex, destIndex, null, new Regression(DEPENDENT_VARIABLE_FIELD));
registerAnalytics(config);
putAnalytics(config);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);

assertProgress(jobId, 100, 100, 100, 100);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
assertInferenceModelPersisted(jobId);

// Call _delete_expired_data API and check nothing was deleted
assertThat(deleteExpiredData().isDeleted(), is(true));
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());

// Delete the config straight from the config index
DeleteResponse deleteResponse = client().prepareDelete(".ml-config", DataFrameAnalyticsConfig.documentId(jobId))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute().actionGet();
assertThat(deleteResponse.status(), equalTo(RestStatus.OK));

// Now calling the _delete_expired_data API should remove unused state
assertThat(deleteExpiredData().isDeleted(), is(true));

SearchResponse stateIndexSearchResponse = client().prepareSearch(".ml-state").execute().actionGet();
assertThat(stateIndexSearchResponse.getHits().getTotalHits().value, equalTo(0L));
}

private void initialize(String jobId) {
this.jobId = jobId;
this.sourceIndex = jobId + "_source_index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
Expand Down Expand Up @@ -165,7 +165,7 @@ private void deleteState(ParentTaskAssigningClient parentTaskClient,
DataFrameAnalyticsConfig config,
ActionListener<BulkByScrollResponse> listener) {
List<String> ids = new ArrayList<>();
ids.add(DataFrameAnalyticsTask.progressDocId(config.getId()));
ids.add(StoredProgress.documentId(config.getId()));
if (config.getAnalysis().persistsState()) {
ids.add(config.getAnalysis().getStateDocId(config.getId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private void searchStoredProgresses(List<String> configIds, ActionListener<List<
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
searchRequest.source().size(1);
searchRequest.source().query(QueryBuilders.idsQuery().addIds(DataFrameAnalyticsTask.progressDocId(configId)));
searchRequest.source().query(QueryBuilders.idsQuery().addIds(StoredProgress.documentId(configId)));
multiSearchRequest.add(searchRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private void persistProgress(Runnable runnable) {
statsResponse -> {
GetDataFrameAnalyticsStatsAction.Response.Stats stats = statsResponse.getResponse().results().get(0);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias());
indexRequest.id(progressDocId(taskParams.getId()));
indexRequest.id(StoredProgress.documentId(taskParams.getId()));
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder jsonBuilder = JsonXContent.contentBuilder()) {
new StoredProgress(stats.getProgress()).toXContent(jsonBuilder, Payload.XContent.EMPTY_PARAMS);
Expand Down Expand Up @@ -310,10 +310,6 @@ public static StartingState determineStartingState(String jobId, List<PhaseProgr
}
}

public static String progressDocId(String id) {
return "data_frame_analytics-" + id + "-progress";
}

public static class ProgressTracker {

public static final String REINDEXING = "reindexing";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ml.dataframe;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
Expand All @@ -14,6 +15,8 @@
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class StoredProgress implements ToXContentObject {

Expand Down Expand Up @@ -57,4 +60,15 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(progress);
}

public static String documentId(String id) {
return "data_frame_analytics-" + id + "-progress";
}

@Nullable
public static String extractJobIdFromDocId(String docId) {
Pattern pattern = Pattern.compile("^data_frame_analytics-(.*)-progress$");
Matcher matcher = pattern.matcher(docId);
return matcher.find() ? matcher.group(1) : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;

import java.io.IOException;
import java.io.InputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.ml.utils.persistence.BatchedDocumentsIterator;

public abstract class BatchedResultsIterator<T> extends BatchedDocumentsIterator<Result<T>> {

Expand Down
Loading