Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Job in index: Enable get and update actions for clusterstate jobs #35598

Merged
merged 4 commits into from
Nov 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
Expand Down Expand Up @@ -358,6 +359,16 @@ public static void addDatafeedConfigFields(XContentBuilder builder) throws IOExc
.endObject()
.endObject()
.endObject()
.startObject(DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName())
.startObject(PROPERTIES)
.startObject(DelayedDataCheckConfig.ENABLED.getPreferredName())
.field(TYPE, BOOLEAN)
.endObject()
.startObject(DelayedDataCheckConfig.CHECK_WINDOW.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.startObject(DatafeedConfig.HEADERS.getPreferredName())
.field(ENABLED, false)
.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
Expand Down Expand Up @@ -251,6 +252,9 @@ public final class ReservedFieldNames {
DatafeedConfig.SCRIPT_FIELDS.getPreferredName(),
DatafeedConfig.CHUNKING_CONFIG.getPreferredName(),
DatafeedConfig.HEADERS.getPreferredName(),
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName(),
DelayedDataCheckConfig.ENABLED.getPreferredName(),
DelayedDataCheckConfig.CHECK_WINDOW.getPreferredName(),

ChunkingConfig.MODE_FIELD.getPreferredName(),
ChunkingConfig.TIME_SPAN_FIELD.getPreferredName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.List;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -103,6 +104,8 @@ public void testExpandGroupIds() {
assertThat(groupOrJobLookup.expandGroupIds("foo*"), contains("foo-group"));
assertThat(groupOrJobLookup.expandGroupIds("bar-group,nogroup"), contains("bar-group"));
assertThat(groupOrJobLookup.expandGroupIds("*"), contains("bar-group", "foo-group"));
assertThat(groupOrJobLookup.expandGroupIds("foo-group"), contains("foo-group"));
assertThat(groupOrJobLookup.expandGroupIds("no-group"), empty());
}

private static Job mockJob(String jobId, List<String> groups) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public TransportGetBucketsAction(Settings settings, ThreadPool threadPool, Trans

@Override
protected void doExecute(GetBucketsAction.Request request, ActionListener<GetBucketsAction.Response> listener) {
jobManager.getJob(request.getJobId(), ActionListener.wrap(
job -> {
jobManager.jobExists(request.getJobId(), ActionListener.wrap(
jobFound -> {
BucketsQueryBuilder query =
new BucketsQueryBuilder().expand(request.isExpand())
.includeInterim(request.isExcludeInterim() == false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
import org.elasticsearch.xpack.core.ml.action.GetCalendarsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;

Expand All @@ -29,17 +28,17 @@ public class TransportGetCalendarEventsAction extends HandledTransportAction<Get
GetCalendarEventsAction.Response> {

private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;
private final JobManager jobManager;

@Inject
public TransportGetCalendarEventsAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider) {
JobResultsProvider jobResultsProvider, JobManager jobManager) {
super(settings, GetCalendarEventsAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, GetCalendarEventsAction.Request::new);
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
this.jobManager = jobManager;
}

@Override
Expand All @@ -66,15 +65,13 @@ protected void doExecute(GetCalendarEventsAction.Request request,

if (request.getJobId() != null) {

jobConfigProvider.getJob(request.getJobId(), ActionListener.wrap(
jobBuiler -> {
Job job = jobBuiler.build();
jobManager.getJob(request.getJobId(), ActionListener.wrap(
job -> {
jobResultsProvider.scheduledEventsForJob(request.getJobId(), job.getGroups(), query, eventsListener);

},
jobNotFound -> {
// is the request Id a group?
jobConfigProvider.groupExists(request.getJobId(), ActionListener.wrap(
jobManager.groupExists(request.getJobId(), ActionListener.wrap(
groupExists -> {
if (groupExists) {
jobResultsProvider.scheduledEventsForJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public TransportGetInfluencersAction(Settings settings, ThreadPool threadPool, T
@Override
protected void doExecute(GetInfluencersAction.Request request, ActionListener<GetInfluencersAction.Response> listener) {

jobManager.getJob(request.getJobId(), ActionListener.wrap(
job -> {
jobManager.jobExists(request.getJobId(), ActionListener.wrap(
jobFound -> {
InfluencersQueryBuilder.InfluencersQuery query = new InfluencersQueryBuilder()
.includeInterim(request.isExcludeInterim() == false)
.start(request.getStart())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public TransportGetRecordsAction(Settings settings, ThreadPool threadPool, Trans
@Override
protected void doExecute(GetRecordsAction.Request request, ActionListener<GetRecordsAction.Response> listener) {

jobManager.getJob(request.getJobId(), ActionListener.wrap(
job -> {
jobManager.jobExists(request.getJobId(), ActionListener.wrap(
jobFound -> {
RecordsQueryBuilder query = new RecordsQueryBuilder()
.includeInterim(request.isExcludeInterim() == false)
.epochStart(request.getStart())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -52,6 +54,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
Expand All @@ -66,6 +69,8 @@
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.ClusterStateJobUpdate;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
Expand Down Expand Up @@ -102,24 +107,26 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
private final Client client;
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;
private final JobResultsProvider jobResultsProvider;
private final JobManager jobManager;
private final MlMemoryTracker memoryTracker;

@Inject
public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
XPackLicenseState licenseState, ClusterService clusterService,
PersistentTasksService persistentTasksService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client,
JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider,
MlMemoryTracker memoryTracker) {
JobResultsProvider jobResultsProvider, JobManager jobManager,
JobConfigProvider jobConfigProvider, MlMemoryTracker memoryTracker) {
super(settings, OpenJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, OpenJobAction.Request::new);
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
this.client = client;
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
this.jobManager = jobManager;
this.memoryTracker = memoryTracker;
}

Expand Down Expand Up @@ -618,10 +625,10 @@ public void onFailure(Exception e) {
);

// Get the job config
jobConfigProvider.getJob(jobParams.getJobId(), ActionListener.wrap(
builder -> {
jobManager.getJob(jobParams.getJobId(), ActionListener.wrap(
job -> {
try {
jobParams.setJob(builder.build());
jobParams.setJob(job);

// Try adding results doc mapping
addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobParams.getJobId()),
Expand Down Expand Up @@ -670,16 +677,48 @@ public void onTimeout(TimeValue timeout) {
}

private void clearJobFinishedTime(String jobId, ActionListener<AcknowledgedResponse> listener) {
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();

jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
job -> listener.onResponse(new AcknowledgedResponse(true)),
e -> {
logger.error("[" + jobId + "] Failed to clear finished_time", e);
// Not a critical error so continue
boolean jobIsInClusterState = ClusterStateJobUpdate.jobIsInClusterState(clusterService.state(), jobId);
if (jobIsInClusterState) {
clusterService.submitStateUpdateTask("clearing-job-finish-time-for-" + jobId, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
jobBuilder.setFinishedTime(null);

mlMetadataBuilder.putJob(jobBuilder.build(), true);
ClusterState.Builder builder = ClusterState.builder(currentState);
return builder.metaData(new MetaData.Builder(currentState.metaData())
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
.build();
}

@Override
public void onFailure(String source, Exception e) {
logger.error("[" + jobId + "] Failed to clear finished_time; source [" + source + "]", e);
listener.onResponse(new AcknowledgedResponse(true));
}
));

@Override
public void clusterStateProcessed(String source, ClusterState oldState,
ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
}
});
} else {
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();

jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
job -> listener.onResponse(new AcknowledgedResponse(true)),
e -> {
logger.error("[" + jobId + "] Failed to clear finished_time", e);
// Not a critical error so continue
listener.onResponse(new AcknowledgedResponse(true));
}
));
}
}

private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;

import java.io.BufferedReader;
import java.io.InputStream;
Expand All @@ -35,29 +37,30 @@
public class TransportPreviewDatafeedAction extends HandledTransportAction<PreviewDatafeedAction.Request, PreviewDatafeedAction.Response> {

private final Client client;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final ClusterService clusterService;
private final JobManager jobManager;
private final DatafeedConfigReader datafeedConfigReader;

@Inject
public TransportPreviewDatafeedAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client, JobConfigProvider jobConfigProvider,
DatafeedConfigProvider datafeedConfigProvider) {
Client client, JobManager jobManager, NamedXContentRegistry xContentRegistry,
ClusterService clusterService) {
super(settings, PreviewDatafeedAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
PreviewDatafeedAction.Request::new);
this.client = client;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.clusterService = clusterService;
this.jobManager = jobManager;
this.datafeedConfigReader = new DatafeedConfigReader(client, xContentRegistry);
}

@Override
protected void doExecute(PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {

datafeedConfigProvider.getDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
datafeedConfigBuilder -> {
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
jobConfigProvider.getJob(datafeedConfig.getJobId(), ActionListener.wrap(
jobBuilder -> {
datafeedConfigReader.datafeedConfig(request.getDatafeedId(), clusterService.state(), ActionListener.wrap(
datafeedConfig -> {
jobManager.getJob(datafeedConfig.getJobId(), ActionListener.wrap(
job -> {
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig);
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
Expand All @@ -66,7 +69,7 @@ protected void doExecute(PreviewDatafeedAction.Request request, ActionListener<P
// NB: this is using the client from the transport layer, NOT the internal client.
// This is important because it means the datafeed search will fail if the user
// requesting the preview doesn't have permission to search the relevant indices.
DataExtractorFactory.create(client, previewDatafeed.build(), jobBuilder.build(),
DataExtractorFactory.create(client, previewDatafeed.build(), job,
new ActionListener<DataExtractorFactory>() {
@Override
public void onResponse(DataExtractorFactory dataExtractorFactory) {
Expand Down
Loading