Skip to content

Commit

Permalink
[ML] JIndex: Job exists and get job should read cluster state first. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Dec 10, 2018
1 parent a2543ec commit a3769aa
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,15 @@ public void groupExists(String groupId, ActionListener<Boolean> listener) {
}

public void jobExists(String jobId, ActionListener<Boolean> listener) {
jobConfigProvider.jobExists(jobId, false, ActionListener.wrap(
jobFound -> {
if (jobFound) {
listener.onResponse(Boolean.TRUE);
} else {
// Look in the clusterstate for the job config
if (MlMetadata.getMlMetadata(clusterService.state()).getJobs().containsKey(jobId)) {
listener.onResponse(Boolean.TRUE);
} else {
listener.onFailure(ExceptionsHelper.missingJobException(jobId));
}
}
},
listener::onFailure
));
if (MlMetadata.getMlMetadata(clusterService.state()).getJobs().containsKey(jobId)) {
listener.onResponse(Boolean.TRUE);
} else {
// check the index
jobConfigProvider.jobExists(jobId, true, ActionListener.wrap(
jobFound -> listener.onResponse(jobFound),
listener::onFailure
));
}
}

/**
Expand All @@ -173,33 +167,14 @@ public void jobExists(String jobId, ActionListener<Boolean> listener) {
* a ResourceNotFoundException is returned
*/
public void getJob(String jobId, ActionListener<Job> jobListener) {
jobConfigProvider.getJob(jobId, ActionListener.wrap(
r -> jobListener.onResponse(r.build()), // TODO JIndex we shouldn't be building the job here
e -> {
if (e instanceof ResourceNotFoundException) {
// Try to get the job from the cluster state
getJobFromClusterState(jobId, jobListener);
} else {
jobListener.onFailure(e);
}
}
));
}

/**
* Read a job from the cluster state.
* The job is returned on the same thread even though a listener is used.
*
* @param jobId the jobId
* @param jobListener the Job listener. If no job matches {@code jobId}
* a ResourceNotFoundException is returned
*/
private void getJobFromClusterState(String jobId, ActionListener<Job> jobListener) {
Job job = MlMetadata.getMlMetadata(clusterService.state()).getJobs().get(jobId);
if (job == null) {
jobListener.onFailure(ExceptionsHelper.missingJobException(jobId));
} else {
if (job != null) {
jobListener.onResponse(job);
} else {
jobConfigProvider.getJob(jobId, ActionListener.wrap(
r -> jobListener.onResponse(r.build()), // TODO JIndex we shouldn't be building the job here
jobListener::onFailure
));
}
}

Expand Down Expand Up @@ -366,6 +341,22 @@ public void putJob(PutJobAction.Request request, AnalysisRegistry analysisRegist
return;
}

// Check the job id is not the same as a group Id
if (currentMlMetadata.isGroupOrJob(job.getId())) {
actionListener.onFailure(new
ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, job.getId())));
return;
}

// and that the new job's groups are not job Ids
for (String group : job.getGroups()) {
if (currentMlMetadata.getJobs().containsKey(group)) {
actionListener.onFailure(new
ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, group)));
return;
}
}

ActionListener<Boolean> putJobListener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean indicesCreated) {
Expand Down Expand Up @@ -446,6 +437,35 @@ public void onFailure(Exception e) {

public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());

if (request.getJobUpdate().getGroups() != null && request.getJobUpdate().getGroups().isEmpty() == false) {

// check the new groups are not job Ids
for (String group : request.getJobUpdate().getGroups()) {
if (mlMetadata.getJobs().containsKey(group)) {
actionListener.onFailure(new ResourceAlreadyExistsException(
Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, group)));
}
}

jobConfigProvider.jobIdMatches(request.getJobUpdate().getGroups(), ActionListener.wrap(
matchingIds -> {
if (matchingIds.isEmpty()) {
updateJobPostInitialChecks(request, mlMetadata, actionListener);
} else {
actionListener.onFailure(new ResourceAlreadyExistsException(
Messages.getMessage(Messages.JOB_AND_GROUP_NAMES_MUST_BE_UNIQUE, matchingIds.get(0))));
}
},
actionListener::onFailure
));
} else {
updateJobPostInitialChecks(request, mlMetadata, actionListener);
}
}

private void updateJobPostInitialChecks(UpdateJobAction.Request request, MlMetadata mlMetadata,
ActionListener<PutJobAction.Response> actionListener) {
if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) {
updateJobClusterState(request, actionListener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
Expand Down Expand Up @@ -497,7 +498,7 @@ public void testPutJob_AddsCreateTime() throws IOException {
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
JobManager jobManager = createJobManager(mockClientBuilder.build());

PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob());
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo());

doAnswer(invocation -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocation.getArguments()[1];
Expand Down Expand Up @@ -544,7 +545,7 @@ public void testJobExists_GivenMissingJob() {

doAnswer(invocationOnMock -> {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(false);
listener.onFailure(ExceptionsHelper.missingJobException("non-job"));
return null;
}).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any());

Expand Down Expand Up @@ -579,18 +580,43 @@ public void testJobExists_GivenJobIsInClusterState() {
Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);


JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService,
auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider);

AtomicBoolean jobExistsHolder = new AtomicBoolean();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
jobManager.jobExists("cs-job", ActionListener.wrap(
jobExistsHolder::set,
exceptionHolder::set
));

assertTrue(jobExistsHolder.get());
assertNull(exceptionHolder.get());
verify(jobConfigProvider, never()).jobExists(anyString(), anyBoolean(), any());
}

public void testJobExists_GivenJobIsInIndex() {
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build();
when(clusterService.state()).thenReturn(clusterState);

ClusterSettings clusterSettings = new ClusterSettings(environment.settings(),
Collections.singleton(MachineLearningField.MAX_MODEL_MEMORY_LIMIT));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);

JobConfigProvider jobConfigProvider = mock(JobConfigProvider.class);
doAnswer(invocationOnMock -> {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(false);
listener.onResponse(true);
return null;
}).when(jobConfigProvider).jobExists(anyString(), anyBoolean(), any());
}).when(jobConfigProvider).jobExists(eq("index-job"), anyBoolean(), any());

JobManager jobManager = new JobManager(environment, environment.settings(), jobResultsProvider, clusterService,
auditor, threadPool, mock(Client.class), updateJobProcessNotifier, jobConfigProvider);

AtomicBoolean jobExistsHolder = new AtomicBoolean();
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
jobManager.jobExists("cs-job", ActionListener.wrap(
jobManager.jobExists("index-job", ActionListener.wrap(
jobExistsHolder::set,
exceptionHolder::set
));
Expand All @@ -603,7 +629,7 @@ public void testPutJob_ThrowsIfJobExistsInClusterState() throws IOException {
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
JobManager jobManager = createJobManager(mockClientBuilder.build());

PutJobAction.Request putJobRequest = new PutJobAction.Request(createJob());
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo());

MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
mlMetadata.putJob(buildJobBuilder("foo").build(), false);
Expand All @@ -623,6 +649,54 @@ public void onFailure(Exception e) {
});
}

public void testPutJob_ThrowsIfIdIsTheSameAsAGroup() throws IOException {
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
JobManager jobManager = createJobManager(mockClientBuilder.build());


MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
Job.Builder jobBuilder = buildJobBuilder("job-with-group-foo");
jobBuilder.setGroups(Collections.singletonList("foo"));
mlMetadata.putJob(jobBuilder.build(), false);
ClusterState clusterState = ClusterState.builder(new ClusterName("name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, mlMetadata.build())).build();

// job id cannot be a group
PutJobAction.Request putJobRequest = new PutJobAction.Request(createJobFoo());
jobManager.putJob(putJobRequest, analysisRegistry, clusterState, new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
fail("should have got an error");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof ResourceAlreadyExistsException);
assertEquals("job and group names must be unique but job [foo] and group [foo] have the same name", e.getMessage());
}
});

// the job's groups cannot be job Ids
jobBuilder = buildJobBuilder("job-with-clashing-group-name");
jobBuilder.setCreateTime(null);
jobBuilder.setGroups(Collections.singletonList("job-with-group-foo"));
putJobRequest = new PutJobAction.Request(jobBuilder);

jobManager.putJob(putJobRequest, analysisRegistry, clusterState, new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
fail("should have got an error");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof ResourceAlreadyExistsException);
assertEquals("job and group names must be unique but job [job-with-group-foo] and " +
"group [job-with-group-foo] have the same name", e.getMessage());
}
});
}

public void testNotifyFilterChangedGivenNoop() {
MlFilter filter = MlFilter.builder("my_filter").build();
MockClientBuilder mockClientBuilder = new MockClientBuilder("jobmanager-test");
Expand Down Expand Up @@ -900,7 +974,7 @@ public void testRevertSnapshot_GivenJobInClusterState() {
verify(jobConfigProvider, never()).updateJob(any(), any(), any(), any());
}

private Job.Builder createJob() {
private Job.Builder createJobFoo() {
Detector.Builder d1 = new Detector.Builder("info_content", "domain");
d1.setOverFieldName("client");
AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Collections.singletonList(d1.build()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,28 @@
"description":"Can't update all description"
}
- do:
xpack.ml.put_job:
job_id: job-crud-update-group-name-clash
body: >
{
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
- do:
catch: "/job and group names must be unique/"
xpack.ml.update_job:
job_id: jobs-crud-update-job
body: >
{
"groups": ["job-crud-update-group-name-clash"]
}
---
"Test cannot decrease model_memory_limit below current usage":
- skip:
Expand Down

0 comments on commit a3769aa

Please sign in to comment.