diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java new file mode 100644 index 0000000000000..e89435e4d3843 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/process/TimingStats.java @@ -0,0 +1,138 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml.job.process; + +import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +/** + * Stats that give more insight into timing of various operations performed as part of anomaly detection job. + */ +public class TimingStats implements ToXContentObject { + + public static final ParseField BUCKET_COUNT = new ParseField("bucket_count"); + public static final ParseField MIN_BUCKET_PROCESSING_TIME_MS = new ParseField("minimum_bucket_processing_time_ms"); + public static final ParseField MAX_BUCKET_PROCESSING_TIME_MS = new ParseField("maximum_bucket_processing_time_ms"); + public static final ParseField AVG_BUCKET_PROCESSING_TIME_MS = new ParseField("average_bucket_processing_time_ms"); + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "timing_stats", + true, + args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4])); + + static { + PARSER.declareString(constructorArg(), Job.ID); + PARSER.declareLong(constructorArg(), BUCKET_COUNT); + PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS); + PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS); + PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS); + } + + private final String jobId; + private long bucketCount; + private Double minBucketProcessingTimeMs; + private Double maxBucketProcessingTimeMs; + private Double avgBucketProcessingTimeMs; + + public TimingStats( + String jobId, + long bucketCount, + @Nullable Double minBucketProcessingTimeMs, + @Nullable Double maxBucketProcessingTimeMs, + @Nullable Double avgBucketProcessingTimeMs) { + this.jobId = jobId; + this.bucketCount = bucketCount; + this.minBucketProcessingTimeMs = minBucketProcessingTimeMs; + this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs; + this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs; + } + + public String getJobId() { + return jobId; + } + + public long getBucketCount() { + return bucketCount; + } + + public Double getMinBucketProcessingTimeMs() { + return minBucketProcessingTimeMs; + } + + public Double getMaxBucketProcessingTimeMs() { + return maxBucketProcessingTimeMs; + } + + public Double getAvgBucketProcessingTimeMs() { + return avgBucketProcessingTimeMs; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(Job.ID.getPreferredName(), jobId); + builder.field(BUCKET_COUNT.getPreferredName(), bucketCount); + if (minBucketProcessingTimeMs != null) { + builder.field(MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), minBucketProcessingTimeMs); + } + if (maxBucketProcessingTimeMs != null) { + builder.field(MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), maxBucketProcessingTimeMs); + } + if (avgBucketProcessingTimeMs != null) { + builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if (o == null || getClass() != o.getClass()) return false; + TimingStats that = (TimingStats) o; + return Objects.equals(this.jobId, that.jobId) + && this.bucketCount == that.bucketCount + && Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs) + && Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs) + && Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs); + } + + @Override + public int hashCode() { + return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/stats/JobStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/stats/JobStats.java index df5be4aa4c5cc..078f781d4b21d 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/stats/JobStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/stats/JobStats.java @@ -22,6 +22,7 @@ import org.elasticsearch.client.ml.job.config.JobState; import org.elasticsearch.client.ml.job.process.DataCounts; import org.elasticsearch.client.ml.job.process.ModelSizeStats; +import org.elasticsearch.client.ml.job.process.TimingStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.unit.TimeValue; @@ -42,6 +43,7 @@ public class JobStats implements ToXContentObject { private static final ParseField DATA_COUNTS = new ParseField("data_counts"); private static final ParseField MODEL_SIZE_STATS = new ParseField("model_size_stats"); + private static final ParseField TIMING_STATS = new ParseField("timing_stats"); private static final ParseField FORECASTS_STATS = new ParseField("forecasts_stats"); private static final ParseField STATE = new ParseField("state"); private static final ParseField NODE = new ParseField("node"); @@ -58,6 +60,7 @@ public class JobStats implements ToXContentObject { JobState jobState = (JobState) a[i++]; ModelSizeStats.Builder modelSizeStatsBuilder = (ModelSizeStats.Builder) a[i++]; ModelSizeStats modelSizeStats = modelSizeStatsBuilder == null ? null : modelSizeStatsBuilder.build(); + TimingStats timingStats = (TimingStats) a[i++]; ForecastStats forecastStats = (ForecastStats) a[i++]; NodeAttributes node = (NodeAttributes) a[i++]; String assignmentExplanation = (String) a[i++]; @@ -66,6 +69,7 @@ public class JobStats implements ToXContentObject { dataCounts, jobState, modelSizeStats, + timingStats, forecastStats, node, assignmentExplanation, @@ -80,6 +84,7 @@ public class JobStats implements ToXContentObject { STATE, ObjectParser.ValueType.VALUE); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSizeStats.PARSER, MODEL_SIZE_STATS); + PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), TimingStats.PARSER, TIMING_STATS); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ForecastStats.PARSER, FORECASTS_STATS); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), NodeAttributes.PARSER, NODE); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ASSIGNMENT_EXPLANATION); @@ -94,22 +99,24 @@ public class JobStats implements ToXContentObject { private final DataCounts dataCounts; private final JobState state; private final ModelSizeStats modelSizeStats; + private final TimingStats timingStats; private final ForecastStats forecastStats; private final NodeAttributes node; private final String assignmentExplanation; private final TimeValue openTime; JobStats(String jobId, DataCounts dataCounts, JobState state, @Nullable ModelSizeStats modelSizeStats, - @Nullable ForecastStats forecastStats, @Nullable NodeAttributes node, - @Nullable String assignmentExplanation, @Nullable TimeValue opentime) { + @Nullable TimingStats timingStats, @Nullable ForecastStats forecastStats, @Nullable NodeAttributes node, + @Nullable String assignmentExplanation, @Nullable TimeValue openTime) { this.jobId = Objects.requireNonNull(jobId); this.dataCounts = Objects.requireNonNull(dataCounts); this.state = Objects.requireNonNull(state); this.modelSizeStats = modelSizeStats; + this.timingStats = timingStats; this.forecastStats = forecastStats; this.node = node; this.assignmentExplanation = assignmentExplanation; - this.openTime = opentime; + this.openTime = openTime; } /** @@ -135,6 +142,10 @@ public ModelSizeStats getModelSizeStats() { return modelSizeStats; } + public TimingStats getTimingStats() { + return timingStats; + } + /** * An object that provides statistical information about forecasts of this job. * See {@link ForecastStats} @@ -182,6 +193,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (modelSizeStats != null) { builder.field(MODEL_SIZE_STATS.getPreferredName(), modelSizeStats); } + if (timingStats != null) { + builder.field(TIMING_STATS.getPreferredName(), timingStats); + } if (forecastStats != null) { builder.field(FORECASTS_STATS.getPreferredName(), forecastStats); } @@ -199,7 +213,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public int hashCode() { - return Objects.hash(jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime); + return Objects.hash(jobId, dataCounts, modelSizeStats, timingStats, forecastStats, state, node, assignmentExplanation, openTime); } @Override @@ -216,6 +230,7 @@ public boolean equals(Object obj) { return Objects.equals(jobId, other.jobId) && Objects.equals(this.dataCounts, other.dataCounts) && Objects.equals(this.modelSizeStats, other.modelSizeStats) && + Objects.equals(this.timingStats, other.timingStats) && Objects.equals(this.forecastStats, other.forecastStats) && Objects.equals(this.state, other.state) && Objects.equals(this.node, other.node) && diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/process/TimingStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/process/TimingStatsTests.java new file mode 100644 index 0000000000000..18b6b86a0b6f4 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/process/TimingStatsTests.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml.job.process; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class TimingStatsTests extends AbstractXContentTestCase { + + private static final String JOB_ID = "my-job-id"; + + public static TimingStats createTestInstance(String jobId) { + return new TimingStats( + jobId, + randomLong(), + randomBoolean() ? null : randomDouble(), + randomBoolean() ? null : randomDouble(), + randomBoolean() ? null : randomDouble()); + } + + @Override + public TimingStats createTestInstance() { + return createTestInstance(randomAlphaOfLength(10)); + } + + @Override + protected TimingStats doParseInstance(XContentParser parser) { + return TimingStats.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + public void testConstructor() { + TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23); + + assertThat(stats.getJobId(), equalTo(JOB_ID)); + assertThat(stats.getBucketCount(), equalTo(7L)); + assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0)); + assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0)); + assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23)); + } + + public void testConstructor_NullValues() { + TimingStats stats = new TimingStats(JOB_ID, 7, null, null, null); + + assertThat(stats.getJobId(), equalTo(JOB_ID)); + assertThat(stats.getBucketCount(), equalTo(7L)); + assertNull(stats.getMinBucketProcessingTimeMs()); + assertNull(stats.getMaxBucketProcessingTimeMs()); + assertNull(stats.getAvgBucketProcessingTimeMs()); + } + + public void testEquals() { + TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23); + TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23); + TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23); + + assertTrue(stats1.equals(stats1)); + assertTrue(stats1.equals(stats2)); + assertFalse(stats2.equals(stats3)); + } + + public void testHashCode() { + TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23); + TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23); + TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23); + + assertEquals(stats1.hashCode(), stats1.hashCode()); + assertEquals(stats1.hashCode(), stats2.hashCode()); + assertNotEquals(stats2.hashCode(), stats3.hashCode()); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/stats/JobStatsTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/stats/JobStatsTests.java index 5d00f879140e0..e7aa33f1d7a3b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/stats/JobStatsTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/stats/JobStatsTests.java @@ -24,6 +24,8 @@ import org.elasticsearch.client.ml.job.process.DataCountsTests; import org.elasticsearch.client.ml.job.process.ModelSizeStats; import org.elasticsearch.client.ml.job.process.ModelSizeStatsTests; +import org.elasticsearch.client.ml.job.process.TimingStats; +import org.elasticsearch.client.ml.job.process.TimingStatsTests; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.client.ml.job.config.JobState; @@ -42,12 +44,14 @@ public static JobStats createRandomInstance() { DataCounts dataCounts = DataCountsTests.createTestInstance(jobId); ModelSizeStats modelSizeStats = randomBoolean() ? ModelSizeStatsTests.createRandomized() : null; + TimingStats timingStats = randomBoolean() ? TimingStatsTests.createTestInstance(jobId) : null; ForecastStats forecastStats = randomBoolean() ? ForecastStatsTests.createRandom(1, 22) : null; NodeAttributes nodeAttributes = randomBoolean() ? NodeAttributesTests.createRandom() : null; String assigmentExplanation = randomBoolean() ? randomAlphaOfLength(10) : null; TimeValue openTime = randomBoolean() ? TimeValue.timeValueMillis(randomIntBetween(1, 10000)) : null; - return new JobStats(jobId, dataCounts, state, modelSizeStats, forecastStats, nodeAttributes, assigmentExplanation, openTime); + return new JobStats( + jobId, dataCounts, state, modelSizeStats, timingStats, forecastStats, nodeAttributes, assigmentExplanation, openTime); } @Override diff --git a/server/src/main/java/org/elasticsearch/common/TriConsumer.java b/server/src/main/java/org/elasticsearch/common/TriConsumer.java new file mode 100644 index 0000000000000..94d3a000f326b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/TriConsumer.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.elasticsearch.common; + +/** + * Represents an operation that accepts three arguments and returns no result. + * + * @param the type of the first argument + * @param the type of the second argument + * @param the type of the third argument + */ +@FunctionalInterface +public interface TriConsumer { + /** + * Applies this function to the given arguments. + * + * @param s the first function argument + * @param t the second function argument + * @param u the third function argument + */ + void apply(S s, T t, U u); +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java index b71ca63e3218f..4726f95c80dbc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -38,6 +39,8 @@ import java.util.Map; import java.util.Objects; +import static org.elasticsearch.Version.V_7_3_0; + public class GetJobsStatsAction extends Action { public static final GetJobsStatsAction INSTANCE = new GetJobsStatsAction(); @@ -48,6 +51,7 @@ public class GetJobsStatsAction extends Action { private static final String FORECASTS_STATS = "forecasts_stats"; private static final String STATE = "state"; private static final String NODE = "node"; + private static final String TIMING_STATS = "timing_stats"; private GetJobsStatsAction() { super(NAME); @@ -150,22 +154,24 @@ public static class Response extends BaseTasksResponse implements ToXContentObje public static class JobStats implements ToXContentObject, Writeable { private final String jobId; - private DataCounts dataCounts; + private final DataCounts dataCounts; + @Nullable + private final ModelSizeStats modelSizeStats; @Nullable - private ModelSizeStats modelSizeStats; + private final ForecastStats forecastStats; @Nullable - private ForecastStats forecastStats; + private final TimeValue openTime; + private final JobState state; @Nullable - private TimeValue openTime; - private JobState state; + private final DiscoveryNode node; @Nullable - private DiscoveryNode node; + private final String assignmentExplanation; @Nullable - private String assignmentExplanation; + private final TimingStats timingStats; public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, - @Nullable ForecastStats forecastStats, JobState state, @Nullable DiscoveryNode node, - @Nullable String assignmentExplanation, @Nullable TimeValue opentime) { + @Nullable ForecastStats forecastStats, JobState state, @Nullable DiscoveryNode node, + @Nullable String assignmentExplanation, @Nullable TimeValue openTime, @Nullable TimingStats timingStats) { this.jobId = Objects.requireNonNull(jobId); this.dataCounts = Objects.requireNonNull(dataCounts); this.modelSizeStats = modelSizeStats; @@ -173,7 +179,8 @@ public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats mo this.state = Objects.requireNonNull(state); this.node = node; this.assignmentExplanation = assignmentExplanation; - this.openTime = opentime; + this.openTime = openTime; + this.timingStats = timingStats; } public JobStats(StreamInput in) throws IOException { @@ -185,6 +192,11 @@ public JobStats(StreamInput in) throws IOException { assignmentExplanation = in.readOptionalString(); openTime = in.readOptionalTimeValue(); forecastStats = in.readOptionalWriteable(ForecastStats::new); + if (in.getVersion().onOrAfter(V_7_3_0)) { + timingStats = in.readOptionalWriteable(TimingStats::new); + } else { + timingStats = null; + } } public String getJobId() { @@ -219,6 +231,10 @@ public TimeValue getOpenTime() { return openTime; } + public TimingStats getTimingStats() { + return timingStats; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { // TODO: Have callers wrap the content with an object as they choose rather than forcing it upon them @@ -260,6 +276,9 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc if (openTime != null) { builder.field("open_time", openTime.getStringRep()); } + if (timingStats != null) { + builder.field(TIMING_STATS, timingStats); + } return builder; } @@ -273,11 +292,15 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(assignmentExplanation); out.writeOptionalTimeValue(openTime); out.writeOptionalWriteable(forecastStats); + if (out.getVersion().onOrAfter(V_7_3_0)) { + out.writeOptionalWriteable(timingStats); + } } @Override public int hashCode() { - return Objects.hash(jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime); + return Objects.hash( + jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime, timingStats); } @Override @@ -289,14 +312,15 @@ public boolean equals(Object obj) { return false; } JobStats other = (JobStats) obj; - return Objects.equals(jobId, other.jobId) - && Objects.equals(this.dataCounts, other.dataCounts) - && Objects.equals(this.modelSizeStats, other.modelSizeStats) - && Objects.equals(this.forecastStats, other.forecastStats) - && Objects.equals(this.state, other.state) - && Objects.equals(this.node, other.node) - && Objects.equals(this.assignmentExplanation, other.assignmentExplanation) - && Objects.equals(this.openTime, other.openTime); + return Objects.equals(this.jobId, other.jobId) + && Objects.equals(this.dataCounts, other.dataCounts) + && Objects.equals(this.modelSizeStats, other.modelSizeStats) + && Objects.equals(this.forecastStats, other.forecastStats) + && Objects.equals(this.state, other.state) + && Objects.equals(this.node, other.node) + && Objects.equals(this.assignmentExplanation, other.assignmentExplanation) + && Objects.equals(this.openTime, other.openTime) + && Objects.equals(this.timingStats, other.timingStats); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 986c352b9cd7b..fa77f8822c042 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -39,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.job.results.AnomalyCause; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Bucket; @@ -449,6 +450,7 @@ public static XContentBuilder resultsMapping(String mappingType, Collection PARSER = + new ConstructingObjectParser<>( + TYPE.getPreferredName(), + true, + args -> new TimingStats((String) args[0], (long) args[1], (Double) args[2], (Double) args[3], (Double) args[4])); + + static { + PARSER.declareString(constructorArg(), Job.ID); + PARSER.declareLong(constructorArg(), BUCKET_COUNT); + PARSER.declareDouble(optionalConstructorArg(), MIN_BUCKET_PROCESSING_TIME_MS); + PARSER.declareDouble(optionalConstructorArg(), MAX_BUCKET_PROCESSING_TIME_MS); + PARSER.declareDouble(optionalConstructorArg(), AVG_BUCKET_PROCESSING_TIME_MS); + } + + public static String documentId(String jobId) { + return jobId + "_timing_stats"; + } + + private final String jobId; + private long bucketCount; + private Double minBucketProcessingTimeMs; + private Double maxBucketProcessingTimeMs; + private Double avgBucketProcessingTimeMs; + + public TimingStats( + String jobId, + long bucketCount, + @Nullable Double minBucketProcessingTimeMs, + @Nullable Double maxBucketProcessingTimeMs, + @Nullable Double avgBucketProcessingTimeMs) { + this.jobId = jobId; + this.bucketCount = bucketCount; + this.minBucketProcessingTimeMs = minBucketProcessingTimeMs; + this.maxBucketProcessingTimeMs = maxBucketProcessingTimeMs; + this.avgBucketProcessingTimeMs = avgBucketProcessingTimeMs; + } + + public TimingStats(String jobId) { + this(jobId, 0, null, null, null); + } + + public TimingStats(TimingStats lhs) { + this(lhs.jobId, lhs.bucketCount, lhs.minBucketProcessingTimeMs, lhs.maxBucketProcessingTimeMs, lhs.avgBucketProcessingTimeMs); + } + + public TimingStats(StreamInput in) throws IOException { + this.jobId = in.readString(); + this.bucketCount = in.readLong(); + this.minBucketProcessingTimeMs = in.readOptionalDouble(); + this.maxBucketProcessingTimeMs = in.readOptionalDouble(); + this.avgBucketProcessingTimeMs = in.readOptionalDouble(); + } + + public String getJobId() { + return jobId; + } + + public long getBucketCount() { + return bucketCount; + } + + public Double getMinBucketProcessingTimeMs() { + return minBucketProcessingTimeMs; + } + + public Double getMaxBucketProcessingTimeMs() { + return maxBucketProcessingTimeMs; + } + + public Double getAvgBucketProcessingTimeMs() { + return avgBucketProcessingTimeMs; + } + + /** + * Updates the statistics (min, max, avg) for the given data point (bucket processing time). + */ + public void updateStats(double bucketProcessingTimeMs) { + if (bucketProcessingTimeMs < 0.0) { + throw new IllegalArgumentException("bucketProcessingTimeMs must be positive, was: " + bucketProcessingTimeMs); + } + if (minBucketProcessingTimeMs == null || bucketProcessingTimeMs < minBucketProcessingTimeMs) { + minBucketProcessingTimeMs = bucketProcessingTimeMs; + } + if (maxBucketProcessingTimeMs == null || bucketProcessingTimeMs > maxBucketProcessingTimeMs) { + maxBucketProcessingTimeMs = bucketProcessingTimeMs; + } + if (avgBucketProcessingTimeMs == null) { + avgBucketProcessingTimeMs = bucketProcessingTimeMs; + } else { + // Calculate the cumulative moving average (see https://en.wikipedia.org/wiki/Moving_average#Cumulative_moving_average) of + // bucket processing times. + avgBucketProcessingTimeMs = (bucketCount * avgBucketProcessingTimeMs + bucketProcessingTimeMs) / (bucketCount + 1); + } + bucketCount++; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(jobId); + out.writeLong(bucketCount); + out.writeOptionalDouble(minBucketProcessingTimeMs); + out.writeOptionalDouble(maxBucketProcessingTimeMs); + out.writeOptionalDouble(avgBucketProcessingTimeMs); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Job.ID.getPreferredName(), jobId); + builder.field(BUCKET_COUNT.getPreferredName(), bucketCount); + if (minBucketProcessingTimeMs != null) { + builder.field(MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), minBucketProcessingTimeMs); + } + if (maxBucketProcessingTimeMs != null) { + builder.field(MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), maxBucketProcessingTimeMs); + } + if (avgBucketProcessingTimeMs != null) { + builder.field(AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), avgBucketProcessingTimeMs); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if (o == null || getClass() != o.getClass()) return false; + TimingStats that = (TimingStats) o; + return Objects.equals(this.jobId, that.jobId) + && this.bucketCount == that.bucketCount + && Objects.equals(this.minBucketProcessingTimeMs, that.minBucketProcessingTimeMs) + && Objects.equals(this.maxBucketProcessingTimeMs, that.maxBucketProcessingTimeMs) + && Objects.equals(this.avgBucketProcessingTimeMs, that.avgBucketProcessingTimeMs); + } + + @Override + public int hashCode() { + return Objects.hash(jobId, bucketCount, minBucketProcessingTimeMs, maxBucketProcessingTimeMs, avgBucketProcessingTimeMs); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + /** + * Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics. + */ + public static boolean differSignificantly(TimingStats stats1, TimingStats stats2) { + return differSignificantly(stats1.minBucketProcessingTimeMs, stats2.minBucketProcessingTimeMs) + || differSignificantly(stats1.maxBucketProcessingTimeMs, stats2.maxBucketProcessingTimeMs) + || differSignificantly(stats1.avgBucketProcessingTimeMs, stats2.avgBucketProcessingTimeMs); + } + + /** + * Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO. + * This can be interpreted as values { value1, value2 } differing significantly from each other. + * This method also returns: + * - {@code true} in case one value is {@code null} while the other is not. + * - {@code false} in case both values are {@code null}. + */ + static boolean differSignificantly(Double value1, Double value2) { + if (value1 != null && value2 != null) { + return (value2 / value1 < MIN_VALID_RATIO) || (value1 / value2 < MIN_VALID_RATIO); + } + return (value1 != null) || (value2 != null); + } + + /** + * Minimum ratio of values that is interpreted as values being similar. + * If the values ratio is less than MIN_VALID_RATIO, the values are interpreted as significantly different. + */ + private static final double MIN_VALID_RATIO = 0.9; +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java index 51bdc5ce594ad..b7f304f8565a8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/results/ReservedFieldNames.java @@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import java.util.Arrays; import java.util.HashSet; @@ -174,6 +175,11 @@ public final class ReservedFieldNames { Result.TIMESTAMP.getPreferredName(), Result.IS_INTERIM.getPreferredName(), + TimingStats.BUCKET_COUNT.getPreferredName(), + TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), + TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), + TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), + GetResult._ID, GetResult._INDEX, GetResult._TYPE diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java index 78513130bf25f..c0122fbe19033 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java @@ -18,6 +18,8 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStatsTests; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests; @@ -38,35 +40,19 @@ protected Response createTestInstance() { List jobStatsList = new ArrayList<>(listSize); for (int j = 0; j < listSize; j++) { String jobId = randomAlphaOfLength(10); - DataCounts dataCounts = new DataCountsTests().createTestInstance(); - - ModelSizeStats sizeStats = null; - if (randomBoolean()) { - sizeStats = new ModelSizeStats.Builder("foo").build(); - } - - ForecastStats forecastStats = null; - if (randomBoolean()) { - forecastStats = new ForecastStatsTests().createTestInstance(); - } - + ModelSizeStats sizeStats = randomBoolean() ? null : new ModelSizeStats.Builder("foo").build(); + ForecastStats forecastStats = randomBoolean() ? null : new ForecastStatsTests().createTestInstance(); JobState jobState = randomFrom(EnumSet.allOf(JobState.class)); - - DiscoveryNode node = null; - if (randomBoolean()) { - node = new DiscoveryNode("_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT); - } - String explanation = null; - if (randomBoolean()) { - explanation = randomAlphaOfLength(3); - } - TimeValue openTime = null; - if (randomBoolean()) { - openTime = parseTimeValue(randomPositiveTimeValue(), "open_time-Test"); - } - Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, forecastStats, jobState, node, explanation, - openTime); + DiscoveryNode node = + randomBoolean() + ? null + : new DiscoveryNode("_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT); + String explanation = randomBoolean() ? null : randomAlphaOfLength(3); + TimeValue openTime = randomBoolean() ? null : parseTimeValue(randomPositiveTimeValue(), "open_time-Test"); + TimingStats timingStats = randomBoolean() ? null : TimingStatsTests.createTestInstance("foo"); + Response.JobStats jobStats = + new Response.JobStats(jobId, dataCounts, sizeStats, forecastStats, jobState, node, explanation, openTime, timingStats); jobStatsList.add(jobStats); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java index 42e328e3591e0..59b8049cea398 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.ReservedFieldNames; @@ -76,10 +77,12 @@ public void testResultsMapppingReservedFields() throws Exception { // These are not reserved because they're data types, not field names overridden.add(Result.TYPE.getPreferredName()); overridden.add(DataCounts.TYPE.getPreferredName()); + overridden.add(TimingStats.TYPE.getPreferredName()); overridden.add(CategoryDefinition.TYPE.getPreferredName()); overridden.add(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()); overridden.add(ModelSnapshot.TYPE.getPreferredName()); overridden.add(Quantiles.TYPE.getPreferredName()); + overridden.add(TimingStats.TYPE.getPreferredName()); Set expected = collectResultsDocFieldNames(); expected.removeAll(overridden); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java new file mode 100644 index 0000000000000..309f844524d95 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/state/TimingStatsTests.java @@ -0,0 +1,146 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.job.process.autodetect.state; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class TimingStatsTests extends AbstractSerializingTestCase { + + private static final String JOB_ID = "my-job-id"; + + public static TimingStats createTestInstance(String jobId) { + return new TimingStats( + jobId, + randomLong(), + randomBoolean() ? null : randomDouble(), + randomBoolean() ? null : randomDouble(), + randomBoolean() ? null : randomDouble()); + } + + @Override + public TimingStats createTestInstance() { + return createTestInstance(randomAlphaOfLength(10)); + } + + @Override + protected Writeable.Reader instanceReader() { + return TimingStats::new; + } + + @Override + protected TimingStats doParseInstance(XContentParser parser) { + return TimingStats.PARSER.apply(parser, null); + } + + public void testEquals() { + TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23); + TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23); + TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23); + + assertTrue(stats1.equals(stats1)); + assertTrue(stats1.equals(stats2)); + assertFalse(stats2.equals(stats3)); + } + + public void testHashCode() { + TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23); + TimingStats stats2 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23); + TimingStats stats3 = new TimingStats(JOB_ID, 7, 1.0, 3.0, 1.23); + + assertEquals(stats1.hashCode(), stats1.hashCode()); + assertEquals(stats1.hashCode(), stats2.hashCode()); + assertNotEquals(stats2.hashCode(), stats3.hashCode()); + } + + public void testDefaultConstructor() { + TimingStats stats = new TimingStats(JOB_ID); + + assertThat(stats.getJobId(), equalTo(JOB_ID)); + assertThat(stats.getBucketCount(), equalTo(0L)); + assertThat(stats.getMinBucketProcessingTimeMs(), nullValue()); + assertThat(stats.getMaxBucketProcessingTimeMs(), nullValue()); + assertThat(stats.getAvgBucketProcessingTimeMs(), nullValue()); + } + + public void testConstructor() { + TimingStats stats = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23); + + assertThat(stats.getJobId(), equalTo(JOB_ID)); + assertThat(stats.getBucketCount(), equalTo(7L)); + assertThat(stats.getMinBucketProcessingTimeMs(), equalTo(1.0)); + assertThat(stats.getMaxBucketProcessingTimeMs(), equalTo(2.0)); + assertThat(stats.getAvgBucketProcessingTimeMs(), equalTo(1.23)); + } + + public void testCopyConstructor() { + TimingStats stats1 = new TimingStats(JOB_ID, 7, 1.0, 2.0, 1.23); + TimingStats stats2 = new TimingStats(stats1); + + assertThat(stats2.getJobId(), equalTo(JOB_ID)); + assertThat(stats2.getBucketCount(), equalTo(7L)); + assertThat(stats2.getMinBucketProcessingTimeMs(), equalTo(1.0)); + assertThat(stats2.getMaxBucketProcessingTimeMs(), equalTo(2.0)); + assertThat(stats2.getAvgBucketProcessingTimeMs(), equalTo(1.23)); + assertEquals(stats1, stats2); + assertEquals(stats1.hashCode(), stats2.hashCode()); + } + + public void testUpdateStats() { + TimingStats stats = new TimingStats(JOB_ID); + + stats.updateStats(3); + assertThat(stats, equalTo(new TimingStats(JOB_ID, 1, 3.0, 3.0, 3.0))); + + stats.updateStats(2); + assertThat(stats, equalTo(new TimingStats(JOB_ID, 2, 2.0, 3.0, 2.5))); + + stats.updateStats(4); + assertThat(stats, equalTo(new TimingStats(JOB_ID, 3, 2.0, 4.0, 3.0))); + + stats.updateStats(1); + assertThat(stats, equalTo(new TimingStats(JOB_ID, 4, 1.0, 4.0, 2.5))); + + stats.updateStats(5); + assertThat(stats, equalTo(new TimingStats(JOB_ID, 5, 1.0, 5.0, 3.0))); + } + + public void testDocumentId() { + assertThat(TimingStats.documentId("my-job-id"), equalTo("my-job-id_timing_stats")); + } + + public void testTimingStatsDifferSignificantly() { + assertThat( + TimingStats.differSignificantly( + new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0)), + is(false)); + assertThat( + TimingStats.differSignificantly( + new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 11.0, 1.0)), + is(false)); + assertThat( + TimingStats.differSignificantly( + new TimingStats(JOB_ID, 10, 10.0, 10.0, 1.0), new TimingStats(JOB_ID, 10, 10.0, 12.0, 1.0)), + is(true)); + } + + public void testValuesDifferSignificantly() { + assertThat(TimingStats.differSignificantly((Double) null, (Double) null), is(false)); + assertThat(TimingStats.differSignificantly(1.0, null), is(true)); + assertThat(TimingStats.differSignificantly(null, 1.0), is(true)); + assertThat(TimingStats.differSignificantly(0.9, 1.0), is(false)); + assertThat(TimingStats.differSignificantly(1.0, 0.9), is(false)); + assertThat(TimingStats.differSignificantly(0.9, 1.000001), is(true)); + assertThat(TimingStats.differSignificantly(1.0, 0.899999), is(true)); + assertThat(TimingStats.differSignificantly(0.0, 1.0), is(true)); + assertThat(TimingStats.differSignificantly(1.0, 0.0), is(true)); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index eb9d4c9239d22..2ad70e996b51f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; @@ -29,6 +30,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; @@ -42,7 +44,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -103,16 +104,19 @@ protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJo String jobId = task.getJobId(); ClusterState state = clusterService.state(); PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - Optional> stats = processManager.getStatistics(task); + Optional>> stats = processManager.getStatistics(task); if (stats.isPresent()) { + DataCounts dataCounts = stats.get().v1(); + ModelSizeStats modelSizeStats = stats.get().v2().v1(); + TimingStats timingStats = stats.get().v2().v2(); PersistentTasksCustomMetaData.PersistentTask pTask = MlTasks.getJobTask(jobId, tasks); DiscoveryNode node = state.nodes().get(pTask.getExecutorNode()); JobState jobState = MlTasks.getJobState(jobId, tasks); String assignmentExplanation = pTask.getAssignment().getExplanation(); TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(task)); gatherForecastStats(jobId, forecastStats -> { - JobStats jobStats = new JobStats(jobId, stats.get().v1(), - stats.get().v2(), forecastStats, jobState, node, assignmentExplanation, openTime); + JobStats jobStats = new JobStats( + jobId, dataCounts, modelSizeStats, forecastStats, jobState, node, assignmentExplanation, openTime, timingStats); listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD)); }, listener::onFailure); @@ -138,7 +142,7 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc int slot = i; String jobId = closedJobIds.get(i); gatherForecastStats(jobId, forecastStats -> { - gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> { + gatherDataCountsModelSizeStatsAndTimingStats(jobId, (dataCounts, modelSizeStats, timingStats) -> { JobState jobState = MlTasks.getJobState(jobId, tasks); PersistentTasksCustomMetaData.PersistentTask pTask = MlTasks.getJobTask(jobId, tasks); String assignmentExplanation = null; @@ -146,7 +150,7 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc assignmentExplanation = pTask.getAssignment().getExplanation(); } jobStats.set(slot, new JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState, - null, assignmentExplanation, null)); + null, assignmentExplanation, null, timingStats)); if (counter.decrementAndGet() == 0) { List results = response.getResponse().results(); results.addAll(jobStats.asList()); @@ -163,11 +167,13 @@ void gatherForecastStats(String jobId, Consumer handler, Consumer jobResultsProvider.getForecastStats(jobId, handler, errorHandler); } - void gatherDataCountsAndModelSizeStats(String jobId, BiConsumer handler, - Consumer errorHandler) { + void gatherDataCountsModelSizeStatsAndTimingStats( + String jobId, TriConsumer handler, Consumer errorHandler) { jobResultsProvider.dataCounts(jobId, dataCounts -> { jobResultsProvider.modelSizeStats(jobId, modelSizeStats -> { - handler.accept(dataCounts, modelSizeStats); + jobResultsProvider.timingStats(jobId, timingStats -> { + handler.apply(dataCounts, modelSizeStats, timingStats); + }, errorHandler); }, errorHandler); }, errorHandler); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index fc5b87d4afebe..b7d5214ecf304 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer; @@ -121,6 +122,17 @@ private void persistBucketInfluencersStandalone(String jobId, List handler, Consumer errorHandler) { + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + searchSingleResult( + jobId, + TimingStats.TYPE.getPreferredName(), + createTimingStatsSearch(indexName, jobId), + TimingStats.PARSER, + result -> handler.accept(result.result), + errorHandler, + () -> new TimingStats(jobId)); + } + + private SearchRequestBuilder createTimingStatsSearch(String indexName, String jobId) { + return client.prepareSearch(indexName) + .setSize(1) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setQuery(QueryBuilders.idsQuery().addIds(TimingStats.documentId(jobId))); + } + public void getAutodetectParams(Job job, Consumer consumer, Consumer errorHandler) { String jobId = job.getId(); @@ -443,6 +468,7 @@ public void getAutodetectParams(Job job, Consumer consumer, Co MultiSearchRequestBuilder msearch = client.prepareMultiSearch() .add(createLatestDataCountsSearch(resultsIndex, jobId)) .add(createLatestModelSizeStatsSearch(resultsIndex)) + .add(createTimingStatsSearch(resultsIndex, jobId)) // These next two document IDs never need to be the legacy ones due to the rule // that you cannot open a 5.4 job in a subsequent version of the product .add(createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, job.getModelSnapshotId()))) @@ -504,6 +530,8 @@ private void parseAutodetectParamSearchHit(String jobId, AutodetectParams.Builde String hitId = hit.getId(); if (DataCounts.documentId(jobId).equals(hitId)) { paramsBuilder.setDataCounts(parseSearchHit(hit, DataCounts.PARSER, errorHandler)); + } else if (TimingStats.documentId(jobId).equals(hitId)) { + paramsBuilder.setTimingStats(parseSearchHit(hit, TimingStats.PARSER, errorHandler)); } else if (hitId.startsWith(ModelSizeStats.documentIdPrefix(jobId))) { ModelSizeStats.Builder modelSizeStats = parseSearchHit(hit, ModelSizeStats.LENIENT_PARSER, errorHandler); paramsBuilder.setModelSizeStats(modelSizeStats == null ? null : modelSizeStats.build()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 88749c24ee9e2..bc78fd6eba7ab 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer; @@ -334,6 +335,10 @@ public ModelSizeStats getModelSizeStats() { return autoDetectResultProcessor.modelSizeStats(); } + public TimingStats getTimingStats() { + return autoDetectResultProcessor.timingStats(); + } + public DataCounts getDataCounts() { return dataCountsReporter.runningTotalStats(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index cbcaf54b46b9e..513661a9794c6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -45,6 +45,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask; @@ -509,8 +510,15 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService, onProcessCrash(jobTask)); - AutoDetectResultProcessor processor = new AutoDetectResultProcessor( - client, auditor, jobId, renormalizer, jobResultsPersister, autodetectParams.modelSizeStats()); + AutoDetectResultProcessor processor = + new AutoDetectResultProcessor( + client, + auditor, + jobId, + renormalizer, + jobResultsPersister, + autodetectParams.modelSizeStats(), + autodetectParams.timingStats()); ExecutorService autodetectWorkerExecutor; try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService); @@ -716,12 +724,13 @@ public void onFailure(Exception e) { }); } - public Optional> getStatistics(JobTask jobTask) { + public Optional>> getStatistics(JobTask jobTask) { AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask); if (communicator == null) { return Optional.empty(); } - return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats())); + return Optional.of( + new Tuple<>(communicator.getDataCounts(), new Tuple<>(communicator.getModelSizeStats(), communicator.getTimingStats()))); } ExecutorService createAutodetectExecutorService(ExecutorService executorService) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index 4516686b65202..37e2d626b4c45 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; @@ -89,13 +90,26 @@ public class AutoDetectResultProcessor { */ private volatile ModelSizeStats latestModelSizeStats; + /** + * Current timing stats + */ + private volatile TimingStats timingStats; + + /** + * Persisted timing stats. May be stale + */ + private TimingStats persistedTimingStats; // only used from the process() thread, so doesn't need to be volatile + public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, - JobResultsPersister persister, ModelSizeStats latestModelSizeStats) { - this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, new FlushListener()); + JobResultsPersister persister, + ModelSizeStats latestModelSizeStats, + TimingStats timingStats) { + this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, timingStats, new FlushListener()); } AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, - JobResultsPersister persister, ModelSizeStats latestModelSizeStats, FlushListener flushListener) { + JobResultsPersister persister, ModelSizeStats latestModelSizeStats, TimingStats timingStats, + FlushListener flushListener) { this.client = Objects.requireNonNull(client); this.auditor = Objects.requireNonNull(auditor); this.jobId = Objects.requireNonNull(jobId); @@ -103,6 +117,8 @@ public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, R this.persister = Objects.requireNonNull(persister); this.flushListener = Objects.requireNonNull(flushListener); this.latestModelSizeStats = Objects.requireNonNull(latestModelSizeStats); + this.persistedTimingStats = Objects.requireNonNull(timingStats); + this.timingStats = new TimingStats(persistedTimingStats); } public void process(AutodetectProcess process) { @@ -116,7 +132,9 @@ public void process(AutodetectProcess process) { try { if (processKilled == false) { - context.bulkResultsPersister.executeRequest(); + context.bulkResultsPersister + .persistTimingStats(timingStats) + .executeRequest(); } } catch (Exception e) { LOGGER.warn(new ParameterizedMessage("[{}] Error persisting autodetect results", jobId), e); @@ -194,7 +212,9 @@ void processResult(Context context, AutodetectResult result) { // persist after deleting interim results in case the new // results are also interim + processTimingStats(context, bucket.getProcessingTimeMs()); context.bulkResultsPersister.persistBucket(bucket).executeRequest(); + ++bucketCount; } List records = result.getRecords(); @@ -277,6 +297,15 @@ void processResult(Context context, AutodetectResult result) { } } + private void processTimingStats(Context context, long bucketProcessingTimeMs) { + timingStats.updateStats(bucketProcessingTimeMs); + if (TimingStats.differSignificantly(timingStats, persistedTimingStats)) { + context.bulkResultsPersister.persistTimingStats(timingStats); + persistedTimingStats = timingStats; + timingStats = new TimingStats(persistedTimingStats); + } + } + private void processModelSizeStats(Context context, ModelSizeStats modelSizeStats) { LOGGER.trace("[{}] Parsed ModelSizeStats: {} / {} / {} / {} / {} / {}", context.jobId, modelSizeStats.getModelBytes(), modelSizeStats.getTotalByFieldCount(), @@ -407,5 +436,8 @@ static class Context { public ModelSizeStats modelSizeStats() { return latestModelSizeStats; } -} + public TimingStats timingStats() { + return timingStats; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParams.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParams.java index 9421a3662e1ad..7c56d9db1d08c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParams.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParams.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import java.util.ArrayList; import java.util.HashSet; @@ -24,6 +25,8 @@ public class AutodetectParams { private final DataCounts dataCounts; private final ModelSizeStats modelSizeStats; @Nullable + private final TimingStats timingStats; + @Nullable private final ModelSnapshot modelSnapshot; @Nullable private final Quantiles quantiles; @@ -31,12 +34,13 @@ public class AutodetectParams { private final List scheduledEvents; - private AutodetectParams(DataCounts dataCounts, ModelSizeStats modelSizeStats, + private AutodetectParams(DataCounts dataCounts, ModelSizeStats modelSizeStats, TimingStats timingStats, @Nullable ModelSnapshot modelSnapshot, @Nullable Quantiles quantiles, Set filters, List scheduledEvents) { this.dataCounts = Objects.requireNonNull(dataCounts); this.modelSizeStats = Objects.requireNonNull(modelSizeStats); + this.timingStats = timingStats; this.modelSnapshot = modelSnapshot; this.quantiles = quantiles; this.filters = filters; @@ -51,6 +55,10 @@ public ModelSizeStats modelSizeStats() { return modelSizeStats; } + public TimingStats timingStats() { + return timingStats; + } + @Nullable public ModelSnapshot modelSnapshot() { return modelSnapshot; @@ -83,6 +91,7 @@ public boolean equals(Object other) { return Objects.equals(this.dataCounts, that.dataCounts) && Objects.equals(this.modelSizeStats, that.modelSizeStats) + && Objects.equals(this.timingStats, that.timingStats) && Objects.equals(this.modelSnapshot, that.modelSnapshot) && Objects.equals(this.quantiles, that.quantiles) && Objects.equals(this.filters, that.filters) @@ -91,13 +100,14 @@ public boolean equals(Object other) { @Override public int hashCode() { - return Objects.hash(dataCounts, modelSizeStats, modelSnapshot, quantiles, filters, scheduledEvents); + return Objects.hash(dataCounts, modelSizeStats, timingStats, modelSnapshot, quantiles, filters, scheduledEvents); } public static class Builder { private DataCounts dataCounts; private ModelSizeStats modelSizeStats; + private TimingStats timingStats; private ModelSnapshot modelSnapshot; private Quantiles quantiles; private Set filters; @@ -106,6 +116,7 @@ public static class Builder { public Builder(String jobId) { dataCounts = new DataCounts(jobId); modelSizeStats = new ModelSizeStats.Builder(jobId).build(); + timingStats = new TimingStats(jobId); filters = new HashSet<>(); scheduledEvents = new ArrayList<>(); } @@ -124,6 +135,11 @@ public Builder setModelSizeStats(ModelSizeStats modelSizeStats) { return this; } + public Builder setTimingStats(TimingStats timingStats) { + this.timingStats = new TimingStats(timingStats); + return this; + } + public Builder setModelSnapshot(ModelSnapshot modelSnapshot) { this.modelSnapshot = modelSnapshot; return this; @@ -150,7 +166,7 @@ public Builder setFilters(Set filters) { } public AutodetectParams build() { - return new AutodetectParams(dataCounts, modelSizeStats, modelSnapshot, quantiles, + return new AutodetectParams(dataCounts, modelSizeStats, timingStats, modelSnapshot, quantiles, filters, scheduledEvents); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java index eb5a5a3dda526..73eb9a1c7adca 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import java.time.Duration; import java.util.Arrays; @@ -27,8 +28,11 @@ public void testDetermineJobIds() { assertEquals(1, result.size()); assertEquals("id1", result.get(0)); - result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.singletonList( - new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null))); + result = determineJobIdsWithoutLiveStats( + Collections.singletonList("id1"), + Collections.singletonList( + new GetJobsStatsAction.Response.JobStats( + "id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null, new TimingStats("id1")))); assertEquals(0, result.size()); result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Collections.emptyList()); @@ -39,23 +43,28 @@ public void testDetermineJobIds() { result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, - JobState.OPENED, null, null, null)) + JobState.OPENED, null, null, null, new TimingStats("id1"))) ); assertEquals(2, result.size()); assertEquals("id2", result.get(0)); assertEquals("id3", result.get(1)); result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList( - new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null), - new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null) + new GetJobsStatsAction.Response.JobStats( + "id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null, new TimingStats("id1")), + new GetJobsStatsAction.Response.JobStats( + "id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null, new TimingStats("id3")) )); assertEquals(1, result.size()); assertEquals("id2", result.get(0)); result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList( - new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null), - new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, null, JobState.OPENED, null, null, null), - new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null))); + new GetJobsStatsAction.Response.JobStats( + "id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null, new TimingStats("id1")), + new GetJobsStatsAction.Response.JobStats( + "id2", new DataCounts("id2"), null, null, JobState.OPENED, null, null, null, new TimingStats("id2")), + new GetJobsStatsAction.Response.JobStats( + "id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null, new TimingStats("id3")))); assertEquals(0, result.size()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index b778e4d06fe8e..bd43e7b6433df 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; @@ -89,7 +90,7 @@ public void createComponents() throws Exception { renormalizer = mock(Renormalizer.class); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer, - new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build()) { + new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) { @Override protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) { capturedUpdateModelSnapshotOnJobRequests.add(modelSnapshot); @@ -116,8 +117,8 @@ public void testProcessResults() throws Exception { builder.addInfluencers(influencers); CategoryDefinition categoryDefinition = createCategoryDefinition(); builder.addCategoryDefinition(categoryDefinition); - ModelPlot modelPlot = createmodelPlot(); - builder.addmodelPlot(modelPlot); + ModelPlot modelPlot = createModelPlot(); + builder.addModelPlot(modelPlot); ModelSizeStats modelSizeStats = createModelSizeStats(); builder.addModelSizeStats(modelSizeStats); ModelSnapshot modelSnapshot = createModelSnapshot(); @@ -326,7 +327,7 @@ private CategoryDefinition createCategoryDefinition() { return new CategoryDefinitionTests().createTestInstance(JOB_ID); } - private ModelPlot createmodelPlot() { + private ModelPlot createModelPlot() { return new ModelPlotTests().createTestInstance(JOB_ID); } @@ -379,7 +380,7 @@ ResultsBuilder addCategoryDefinition(CategoryDefinition categoryDefinition) { return this; } - ResultsBuilder addmodelPlot(ModelPlot modelPlot) { + ResultsBuilder addModelPlot(ModelPlot modelPlot) { results.add(new AutodetectResult(null, null, null, null, null, null, modelPlot, null, null, null, null)); return this; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java index 0035531d55a57..52a7f15262159 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersisterTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer; @@ -27,7 +28,9 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Map; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -195,6 +198,34 @@ public void testBulkRequestExecutesWhenReachMaxDocs() { verifyNoMoreInteractions(client); } + public void testPersistTimingStats() { + ArgumentCaptor bulkRequestCaptor = ArgumentCaptor.forClass(BulkRequest.class); + Client client = mockClient(bulkRequestCaptor); + + JobResultsPersister persister = new JobResultsPersister(client); + TimingStats timingStats = new TimingStats("foo", 7, 1.0, 2.0, 1.23); + persister.bulkPersisterBuilder(JOB_ID).persistTimingStats(timingStats).executeRequest(); + + verify(client, times(1)).bulk(bulkRequestCaptor.capture()); + BulkRequest bulkRequest = bulkRequestCaptor.getValue(); + assertThat(bulkRequest.requests().size(), equalTo(1)); + IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(0); + assertThat(indexRequest.index(), equalTo(".ml-anomalies-.write-foo")); + assertThat(indexRequest.id(), equalTo("foo_timing_stats")); + assertThat( + indexRequest.sourceAsMap(), + equalTo( + Map.of( + "job_id", "foo", + "bucket_count", 7, + "minimum_bucket_processing_time_ms", 1.0, + "maximum_bucket_processing_time_ms", 2.0, + "average_bucket_processing_time_ms", 1.23))); + + verify(client, times(1)).threadPool(); + verifyNoMoreInteractions(client); + } + @SuppressWarnings({"unchecked"}) private Client mockClient(ArgumentCaptor captor) { Client client = mock(Client.class); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java index 2513aeac596c7..de260a741841f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProviderTests.java @@ -13,7 +13,9 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchResponse; +import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; @@ -44,6 +46,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; @@ -54,6 +57,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -70,6 +74,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class JobResultsProviderTests extends ESTestCase { @@ -824,6 +829,55 @@ public void testCountFields() { assertEquals(7, JobResultsProvider.countFields(Collections.singletonMap("properties", mapping))); } + public void testTimingStats_Ok() throws IOException { + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName("foo"); + List> source = + Arrays.asList( + Map.of( + Job.ID.getPreferredName(), "foo", + TimingStats.BUCKET_COUNT.getPreferredName(), 7, + TimingStats.MIN_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1.0, + TimingStats.MAX_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 1000.0, + TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(), 666.0)); + SearchResponse response = createSearchResponse(source); + Client client = getMockedClient( + queryBuilder -> assertThat(queryBuilder.getName(), equalTo("ids")), + response); + + when(client.prepareSearch(indexName)).thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(indexName)); + JobResultsProvider provider = createProvider(client); + provider.timingStats( + "foo", + stats -> assertThat(stats, equalTo(new TimingStats("foo", 7, 1.0, 1000.0, 666.0))), + e -> { throw new AssertionError(); }); + + verify(client).prepareSearch(indexName); + verify(client).threadPool(); + verify(client).search(any(SearchRequest.class), any(ActionListener.class)); + verifyNoMoreInteractions(client); + } + + public void testTimingStats_NotFound() throws IOException { + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName("foo"); + List> source = new ArrayList<>(); + SearchResponse response = createSearchResponse(source); + Client client = getMockedClient( + queryBuilder -> assertThat(queryBuilder.getName(), equalTo("ids")), + response); + + when(client.prepareSearch(indexName)).thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(indexName)); + JobResultsProvider provider = createProvider(client); + provider.timingStats( + "foo", + stats -> assertThat(stats, equalTo(new TimingStats("foo"))), + e -> { throw new AssertionError(); }); + + verify(client).prepareSearch(indexName); + verify(client).threadPool(); + verify(client).search(any(SearchRequest.class), any(ActionListener.class)); + verifyNoMoreInteractions(client); + } + private Bucket createBucketAtEpochTime(long epoch) { return new Bucket("foo", new Date(epoch), 123); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 40a1d3f969157..bb151ecefb354 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; @@ -93,8 +94,15 @@ public void setUpMocks() { when(persister.persistModelSnapshot(any(), any())) .thenReturn(new IndexResponse(new ShardId("ml", "uid", 0), "doc", "1", 0L, 0L, 0L, true)); flushListener = mock(FlushListener.class); - processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, - new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), flushListener); + processorUnderTest = new AutoDetectResultProcessor( + client, + auditor, + JOB_ID, + renormalizer, + persister, + new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), + new TimingStats(JOB_ID), + flushListener); } @After @@ -121,6 +129,7 @@ public void testProcess() throws TimeoutException { public void testProcessResult_bucket() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); + when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); @@ -130,6 +139,7 @@ public void testProcessResult_bucket() { when(result.getBucket()).thenReturn(bucket); processorUnderTest.processResult(context, result); + verify(bulkBuilder, times(1)).persistTimingStats(any(TimingStats.class)); verify(bulkBuilder, times(1)).persistBucket(bucket); verify(bulkBuilder, times(1)).executeRequest(); verify(persister, never()).deleteInterimResults(JOB_ID); @@ -139,6 +149,7 @@ public void testProcessResult_bucket() { public void testProcessResult_bucket_deleteInterimRequired() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); + when(bulkBuilder.persistTimingStats(any(TimingStats.class))).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); @@ -148,6 +159,7 @@ public void testProcessResult_bucket_deleteInterimRequired() { when(result.getBucket()).thenReturn(bucket); processorUnderTest.processResult(context, result); + verify(bulkBuilder, times(1)).persistTimingStats(any(TimingStats.class)); verify(bulkBuilder, times(1)).persistBucket(bucket); verify(bulkBuilder, times(1)).executeRequest(); verify(persister, times(1)).deleteInterimResults(JOB_ID); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParamsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParamsTests.java new file mode 100644 index 0000000000000..6fbc051c17107 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/AutodetectParamsTests.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.job.process.autodetect.params; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; + +import static org.hamcrest.Matchers.equalTo; + +public class AutodetectParamsTests extends ESTestCase { + + private static final String JOB_ID = "my-job"; + + public void testBuilder_WithTimingStats() { + TimingStats timingStats = new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0); + AutodetectParams params = new AutodetectParams.Builder(JOB_ID).setTimingStats(timingStats).build(); + assertThat(params.timingStats(), equalTo(timingStats)); + + timingStats.updateStats(2000.0); + assertThat(timingStats, equalTo(new TimingStats(JOB_ID, 8, 1.0, 2000.0, 832.75))); + assertThat(params.timingStats(), equalTo(new TimingStats(JOB_ID, 7, 1.0, 1000.0, 666.0))); + } + + public void testBuilder_WithoutTimingStats() { + AutodetectParams params = new AutodetectParams.Builder(JOB_ID).build(); + assertThat(params.timingStats(), equalTo(new TimingStats(JOB_ID))); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java index a49ef0a5e26fa..aa56e392de2bb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/results/BucketTests.java @@ -61,7 +61,7 @@ public Bucket createTestInstance(String jobId) { bucket.setInterim(randomBoolean()); } if (randomBoolean()) { - bucket.setProcessingTimeMs(randomLong()); + bucket.setProcessingTimeMs(randomNonNegativeLong()); } if (randomBoolean()) { int size = randomInt(10); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java index 2bc3ac48b99ec..2bbe7f6d2523b 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; @@ -101,8 +102,9 @@ public void testToXContent() throws IOException { final DataCounts dataCounts = new DataCounts("_job_id", 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, date3, date4, date5, date6, date7); final ForecastStats forecastStats = new ForecastStats(); - final JobStats jobStats = new JobStats("_job", dataCounts, modelStats, forecastStats, JobState.OPENED, discoveryNode, - "_explanation", time); + final TimingStats timingStats = new TimingStats("_job_id", 100, 10.0, 30.0, 20.0); + final JobStats jobStats = new JobStats( + "_job", dataCounts, modelStats, forecastStats, JobState.OPENED, discoveryNode, "_explanation", time, timingStats); final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L); final JobStatsMonitoringDoc document = new JobStatsMonitoringDoc("_cluster", 1502266739402L, 1506593717631L, node, jobStats); @@ -168,8 +170,15 @@ public void testToXContent() throws IOException { + "}" + "}," + "\"assignment_explanation\":\"_explanation\"," - + "\"open_time\":\"13h\"" - + "}" + + "\"open_time\":\"13h\"," + + "\"timing_stats\":{" + + "\"job_id\":\"_job_id\"," + + "\"bucket_count\":100," + + "\"minimum_bucket_processing_time_ms\":10.0," + + "\"maximum_bucket_processing_time_ms\":30.0," + + "\"average_bucket_processing_time_ms\":20.0" + + "}" + + "}" + "}", xContent.utf8ToString()); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml index eb3a73424a601..a8e4bf90d4d77 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/index_layout.yml @@ -124,7 +124,7 @@ setup: Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser count: index: .ml-anomalies-shared - - match: {count: 6} + - match: {count: 8} - do: headers: @@ -138,7 +138,7 @@ setup: term: job_id: index-layout-job - - match: {count: 3} + - match: {count: 4} - do: headers: @@ -152,7 +152,7 @@ setup: term: job_id: index-layout-job - - match: {count: 3} + - match: {count: 4} - do: headers: @@ -166,7 +166,7 @@ setup: term: job_id: index-layout-job2 - - match: {count: 3} + - match: {count: 4} - do: headers: @@ -179,7 +179,7 @@ setup: filter: term: job_id: index-layout-job2 - - match: {count: 3} + - match: {count: 4} - do: headers: @@ -236,7 +236,7 @@ setup: Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser count: index: .ml-anomalies-shared - - match: {count: 3} + - match: {count: 4} - do: @@ -251,7 +251,7 @@ setup: term: job_id: index-layout-job2 - - match: {count: 3} + - match: {count: 4} - do: headers: @@ -265,7 +265,7 @@ setup: term: job_id: index-layout-job2 - - match: {count: 3} + - match: {count: 4} - do: ml.delete_job: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml index b8f1c3df0ca9d..e8d4dcfd77cdc 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_get_stats.yml @@ -98,6 +98,8 @@ setup: - is_true: jobs.0.node.transport_address - match: { jobs.0.node.attributes.ml\.max_open_jobs: "20"} - is_true: jobs.0.open_time + - match: { jobs.0.timing_stats.job_id: job-stats-test } + - gte: { jobs.0.timing_stats.bucket_count: 0 } --- "Test get job stats for closed job": @@ -130,6 +132,8 @@ setup: - match: { jobs.0.state: closed } - is_false: jobs.0.node - is_false: jobs.0.open_time + - match: { jobs.0.timing_stats.job_id: job-stats-test } + - gte: { jobs.0.timing_stats.bucket_count: 0 } --- "Test get job stats of datafeed job that has not received any data": @@ -142,6 +146,8 @@ setup: - match: { jobs.0.model_size_stats.model_bytes : 0 } - match: { jobs.0.state: opened } - is_true: jobs.0.open_time + - match: { jobs.0.timing_stats.job_id: jobs-get-stats-datafeed-job } + - match: { jobs.0.timing_stats.bucket_count: 0 } --- "Test get all job stats with _all": @@ -320,6 +326,8 @@ setup: - match: { jobs.0.state: closed } - is_false: jobs.0.node - is_false: jobs.0.open_time + - match: { jobs.0.timing_stats.job_id: job-stats-test } + - gte: { jobs.0.timing_stats.bucket_count: 0 } - match: { jobs.1.job_id : jobs-get-stats-datafeed-job } - match: { jobs.1.data_counts.processed_record_count: 0 } - match: { jobs.1.data_counts.processed_field_count: 0 } @@ -328,3 +336,5 @@ setup: - match: { jobs.1.state: closed } - is_false: jobs.1.node - is_false: jobs.1.open_time + - match: { jobs.1.timing_stats.job_id: jobs-get-stats-datafeed-job } + - gte: { jobs.1.timing_stats.bucket_count: 0 }