From 47d1795a34ad84488f446493acf5d34fbd358df7 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 10 Oct 2019 11:38:56 -0400 Subject: [PATCH 1/2] [ML][Inference] add inference processors and trained models to usage --- .../ml/MachineLearningFeatureSetUsage.java | 13 ++ .../MachineLearningUsageTransportAction.java | 123 +++++++++++++++++- ...chineLearningInfoTransportActionTests.java | 109 ++++++++++++++++ 3 files changed, 239 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java index 4acfe8f091cb3..d57235f15609f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java @@ -29,10 +29,12 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage { public static final String CREATED_BY = "created_by"; public static final String NODE_COUNT = "node_count"; public static final String DATA_FRAME_ANALYTICS_JOBS_FIELD = "data_frame_analytics_jobs"; + public static final String INFERENCE_FIELD = "inference"; private final Map jobsUsage; private final Map datafeedsUsage; private final Map analyticsUsage; + private final Map inferenceUsage; private final int nodeCount; public MachineLearningFeatureSetUsage(boolean available, @@ -40,11 +42,13 @@ public MachineLearningFeatureSetUsage(boolean available, Map jobsUsage, Map datafeedsUsage, Map analyticsUsage, + Map inferenceUsage, int nodeCount) { super(XPackField.MACHINE_LEARNING, available, enabled); this.jobsUsage = Objects.requireNonNull(jobsUsage); this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage); this.analyticsUsage = Objects.requireNonNull(analyticsUsage); + this.inferenceUsage = Objects.requireNonNull(inferenceUsage); this.nodeCount = nodeCount; } @@ -57,6 +61,11 @@ public MachineLearningFeatureSetUsage(StreamInput in) throws IOException { } else { this.analyticsUsage = Collections.emptyMap(); } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.inferenceUsage = in.readMap(); + } else { + this.inferenceUsage = Collections.emptyMap(); + } this.nodeCount = in.readInt(); } @@ -68,6 +77,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_4_0)) { out.writeMap(analyticsUsage); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeMap(inferenceUsage); + } out.writeInt(nodeCount); } @@ -77,6 +89,7 @@ protected void innerXContent(XContentBuilder builder, Params params) throws IOEx builder.field(JOBS_FIELD, jobsUsage); builder.field(DATAFEEDS_FIELD, datafeedsUsage); builder.field(DATA_FRAME_ANALYTICS_JOBS_FIELD, analyticsUsage); + builder.field(INFERENCE_FIELD, inferenceUsage); if (nodeCount >= 0) { builder.field(NODE_COUNT, nodeCount); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java index ab815e17fe0c8..74e9714cc6602 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java @@ -7,6 +7,12 @@ import org.apache.lucene.util.Counter; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -16,11 +22,13 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.env.Environment; +import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.protocol.xpack.XPackUsageRequest; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse; @@ -32,19 +40,24 @@ import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator; +import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor; import org.elasticsearch.xpack.ml.job.JobManagerHolder; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; public class MachineLearningUsageTransportAction extends XPackUsageFeatureTransportAction { @@ -72,7 +85,7 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat ActionListener listener) { if (enabled == false) { MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(), enabled, - Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0); + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0); listener.onResponse(new XPackUsageFeatureResponse(usage)); return; } @@ -80,20 +93,48 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat Map jobsUsage = new LinkedHashMap<>(); Map datafeedsUsage = new LinkedHashMap<>(); Map analyticsUsage = new LinkedHashMap<>(); + Map inferenceUsage = new LinkedHashMap<>(); int nodeCount = mlNodeCount(state); - // Step 3. Extract usage from data frame analytics stats and return usage response - ActionListener dataframeAnalyticsListener = ActionListener.wrap( + // Step 5. extract trained model config count and then return results + ActionListener trainedModelConfigCountListener = ActionListener.wrap( response -> { - addDataFrameAnalyticsUsage(response, analyticsUsage); + addTrainedModelStats(response, inferenceUsage); MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(), - enabled, jobsUsage, datafeedsUsage, analyticsUsage, nodeCount); + enabled, jobsUsage, datafeedsUsage, analyticsUsage, inferenceUsage, nodeCount); listener.onResponse(new XPackUsageFeatureResponse(usage)); }, listener::onFailure ); - // Step 2. Extract usage from datafeeds stats and return usage response + // Step 4. Extract usage from ingest statistics and gather trained model config count + ActionListener nodesStatsListener = ActionListener.wrap( + response -> { + addInferenceIngestUsage(response, inferenceUsage); + SearchRequestBuilder requestBuilder = client.prepareSearch(InferenceIndexConstants.INDEX_PATTERN) + .setSize(0) + .setTrackTotalHits(true); + ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ClientHelper.ML_ORIGIN, + requestBuilder.request(), + trainedModelConfigCountListener, + client::search); + }, + listener::onFailure + ); + + // Step 3. Extract usage from data frame analytics stats and then request ingest node stats + ActionListener dataframeAnalyticsListener = ActionListener.wrap( + response -> { + addDataFrameAnalyticsUsage(response, analyticsUsage); + String[] ingestNodes = ingestNodes(state); + NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(ingestNodes).clear().ingest(true); + client.execute(NodesStatsAction.INSTANCE, nodesStatsRequest, nodesStatsListener); + }, + listener::onFailure + ); + + // Step 2. Extract usage from datafeeds stats and then request stats for data frame analytics ActionListener datafeedStatsListener = ActionListener.wrap(response -> { addDatafeedsUsage(response, datafeedsUsage); @@ -227,6 +268,66 @@ private void addDataFrameAnalyticsUsage(GetDataFrameAnalyticsStatsAction.Respons } } + private static void initializeStats(Map emptyStatsMap) { + emptyStatsMap.put("sum", 0L); + emptyStatsMap.put("min", 0L); + emptyStatsMap.put("max", 0L); + } + + private static void updateStats(Map statsMap, Long value) { + statsMap.compute("sum", (k, v) -> v + value); + statsMap.compute("min", (k, v) -> Math.min(v, value)); + statsMap.compute("max", (k, v) -> Math.max(v, value)); + } + + //TODO separate out ours and users models possibly regression vs classification + private void addTrainedModelStats(SearchResponse response, Map inferenceUsage) { + inferenceUsage.put("trained_models", + Collections.singletonMap(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getHits().getTotalHits().value))); + } + + //TODO separate out ours and users models possibly regression vs classification + private void addInferenceIngestUsage(NodesStatsResponse response, Map inferenceUsage) { + Set pipelines = new HashSet<>(); + Map docCountStats = new HashMap<>(3); + Map timeStats = new HashMap<>(3); + Map failureStats = new HashMap<>(3); + initializeStats(docCountStats); + initializeStats(timeStats); + initializeStats(failureStats); + + response.getNodes() + .stream() + .map(NodeStats::getIngestStats) + .map(IngestStats::getProcessorStats) + .forEach(map -> + map.forEach((pipelineId, processors) -> { + boolean containsInference = false; + for(IngestStats.ProcessorStat stats : processors) { + if (stats.getName().equals(InferenceProcessor.TYPE)) { + containsInference = true; + long ingestCount = stats.getStats().getIngestCount(); + long ingestTime = stats.getStats().getIngestTimeInMillis(); + long failureCount = stats.getStats().getIngestFailedCount(); + updateStats(docCountStats, ingestCount); + updateStats(timeStats, ingestTime); + updateStats(failureStats, failureCount); + } + } + if (containsInference) { + pipelines.add(pipelineId); + } + }) + ); + + Map ingestUsage = new HashMap<>(4); + ingestUsage.put("pipelines", createCountUsageEntry(pipelines.size())); + ingestUsage.put("docs", docCountStats); + ingestUsage.put("time", timeStats); + ingestUsage.put("failures", failureStats); + inferenceUsage.put("processors", Collections.singletonMap(MachineLearningFeatureSetUsage.ALL, ingestUsage)); + } + private static int mlNodeCount(final ClusterState clusterState) { int mlNodeCount = 0; for (DiscoveryNode node : clusterState.getNodes()) { @@ -236,4 +337,14 @@ private static int mlNodeCount(final ClusterState clusterState) { } return mlNodeCount; } + + private static String[] ingestNodes(final ClusterState clusterState) { + String[] ingestNodes = new String[clusterState.nodes().getIngestNodes().size()]; + Iterator nodeIterator = clusterState.nodes().getIngestNodes().keysIt(); + int i = 0; + while(nodeIterator.hasNext()) { + ingestNodes[i++] = nodeIterator.next(); + } + return ingestNodes; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java index b2f69158aca05..45a8f145b192f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java @@ -5,11 +5,19 @@ */ package org.elasticsearch.xpack.ml; +import org.apache.lucene.search.TotalHits; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; @@ -20,13 +28,18 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackFeatureSet; import org.elasticsearch.xpack.core.XPackField; @@ -41,6 +54,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; +import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; @@ -50,10 +64,12 @@ import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; +import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.JobManagerHolder; import org.junit.Before; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -62,6 +78,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; @@ -99,6 +117,8 @@ public void init() { givenJobs(Collections.emptyList(), Collections.emptyList()); givenDatafeeds(Collections.emptyList()); givenDataFrameAnalytics(Collections.emptyList()); + givenProcessorStats(Collections.emptyList()); + givenTrainedModelConfigCount(0); } private MachineLearningUsageTransportAction newUsageAction(Settings settings) { @@ -169,12 +189,50 @@ public void testUsage() throws Exception { buildDatafeedStats(DatafeedState.STARTED), buildDatafeedStats(DatafeedState.STOPPED) )); + givenDataFrameAnalytics(Arrays.asList( buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED), buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED), buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STARTED) )); + givenProcessorStats(Arrays.asList( + buildNodeStats( + Arrays.asList("pipeline1", "pipeline2", "pipeline3"), + Arrays.asList( + Arrays.asList( + new IngestStats.ProcessorStat(InferenceProcessor.TYPE, new IngestStats.Stats(10, 1, 0, 0)), + new IngestStats.ProcessorStat("grok", new IngestStats.Stats(10, 1, 0, 0)), + new IngestStats.ProcessorStat(InferenceProcessor.TYPE, new IngestStats.Stats(100, 10, 0, 1)) + ), + Arrays.asList( + new IngestStats.ProcessorStat(InferenceProcessor.TYPE, new IngestStats.Stats(5, 1, 0, 0)), + new IngestStats.ProcessorStat("grok", new IngestStats.Stats(10, 1, 0, 0)) + ), + Arrays.asList( + new IngestStats.ProcessorStat("grok", new IngestStats.Stats(10, 1, 0, 0)) + ) + )), + buildNodeStats( + Arrays.asList("pipeline1", "pipeline2", "pipeline3"), + Arrays.asList( + Arrays.asList( + new IngestStats.ProcessorStat(InferenceProcessor.TYPE, new IngestStats.Stats(0, 0, 0, 0)), + new IngestStats.ProcessorStat("grok", new IngestStats.Stats(0, 0, 0, 0)), + new IngestStats.ProcessorStat(InferenceProcessor.TYPE, new IngestStats.Stats(10, 1, 0, 0)) + ), + Arrays.asList( + new IngestStats.ProcessorStat(InferenceProcessor.TYPE, new IngestStats.Stats(5, 1, 0, 0)), + new IngestStats.ProcessorStat("grok", new IngestStats.Stats(10, 1, 0, 0)) + ), + Arrays.asList( + new IngestStats.ProcessorStat("grok", new IngestStats.Stats(10, 1, 0, 0)) + ) + )) + )); + givenTrainedModelConfigCount(100); + + var usageAction = newUsageAction(settings.build()); PlainActionFuture future = new PlainActionFuture<>(); usageAction.masterOperation(null, null, ClusterState.EMPTY_STATE, future); @@ -251,6 +309,18 @@ public void testUsage() throws Exception { assertThat(source.getValue("jobs.opened.forecasts.total"), equalTo(11)); assertThat(source.getValue("jobs.opened.forecasts.forecasted_jobs"), equalTo(2)); + + assertThat(source.getValue("inference.trained_models._all.count"), equalTo(100)); + assertThat(source.getValue("inference.processors._all.pipelines.count"), equalTo(2)); + assertThat(source.getValue("inference.processors._all.docs.sum"), equalTo(130)); + assertThat(source.getValue("inference.processors._all.docs.min"), equalTo(0)); + assertThat(source.getValue("inference.processors._all.docs.max"), equalTo(100)); + assertThat(source.getValue("inference.processors._all.time.sum"), equalTo(14)); + assertThat(source.getValue("inference.processors._all.time.min"), equalTo(0)); + assertThat(source.getValue("inference.processors._all.time.max"), equalTo(10)); + assertThat(source.getValue("inference.processors._all.failures.sum"), equalTo(1)); + assertThat(source.getValue("inference.processors._all.failures.min"), equalTo(0)); + assertThat(source.getValue("inference.processors._all.failures.max"), equalTo(1)); } } @@ -417,6 +487,34 @@ private void givenDataFrameAnalytics(List stats) { + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new NodesStatsResponse(new ClusterName("_name"), stats, Collections.emptyList())); + return Void.TYPE; + }).when(client).execute(same(NodesStatsAction.INSTANCE), any(), any()); + } + + private void givenTrainedModelConfigCount(long count) { + when(client.prepareSearch(InferenceIndexConstants.INDEX_PATTERN)) + .thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE)); + ThreadPool pool = mock(ThreadPool.class); + when(pool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.threadPool()).thenReturn(pool); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[1]; + SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(count, TotalHits.Relation.EQUAL_TO), (float)0.0); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getHits()).thenReturn(searchHits); + listener.onResponse(searchResponse); + return Void.TYPE; + }).when(client).search(any(), any()); + } + private static Detector buildMinDetector(String fieldName) { Detector.Builder detectorBuilder = new Detector.Builder(); detectorBuilder.setFunction("min"); @@ -463,6 +561,17 @@ private static GetDataFrameAnalyticsStatsAction.Response.Stats buildDataFrameAna return stats; } + private static NodeStats buildNodeStats(List pipelineNames, List> processorStats) { + IngestStats ingestStats = new IngestStats( + new IngestStats.Stats(0,0,0,0), + Collections.emptyList(), + IntStream.range(0, pipelineNames.size()).boxed().collect(Collectors.toMap(pipelineNames::get, processorStats::get))); + return new NodeStats(mock(DiscoveryNode.class), + Instant.now().toEpochMilli(), null, null, null, null, null, null, null, null, + null, null, null, ingestStats, null); + + } + private static ForecastStats buildForecastStats(long numberOfForecasts) { return new ForecastStatsTests().createForecastStats(numberOfForecasts, numberOfForecasts); } From 12266b848b19cff84c82495c7c48eda8d66387b1 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 30 Oct 2019 11:32:36 -0400 Subject: [PATCH 2/2] renaming usage fields --- .../MachineLearningUsageTransportAction.java | 10 +++++----- ...chineLearningInfoTransportActionTests.java | 20 +++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java index 74e9714cc6602..4f731d9804c4b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java @@ -320,12 +320,12 @@ private void addInferenceIngestUsage(NodesStatsResponse response, Map ingestUsage = new HashMap<>(4); + Map ingestUsage = new HashMap<>(6); ingestUsage.put("pipelines", createCountUsageEntry(pipelines.size())); - ingestUsage.put("docs", docCountStats); - ingestUsage.put("time", timeStats); - ingestUsage.put("failures", failureStats); - inferenceUsage.put("processors", Collections.singletonMap(MachineLearningFeatureSetUsage.ALL, ingestUsage)); + ingestUsage.put("num_docs_processed", docCountStats); + ingestUsage.put("time_ms", timeStats); + ingestUsage.put("num_failures", failureStats); + inferenceUsage.put("ingest_processors", Collections.singletonMap(MachineLearningFeatureSetUsage.ALL, ingestUsage)); } private static int mlNodeCount(final ClusterState clusterState) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java index 45a8f145b192f..3659778222c87 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java @@ -311,16 +311,16 @@ public void testUsage() throws Exception { assertThat(source.getValue("jobs.opened.forecasts.forecasted_jobs"), equalTo(2)); assertThat(source.getValue("inference.trained_models._all.count"), equalTo(100)); - assertThat(source.getValue("inference.processors._all.pipelines.count"), equalTo(2)); - assertThat(source.getValue("inference.processors._all.docs.sum"), equalTo(130)); - assertThat(source.getValue("inference.processors._all.docs.min"), equalTo(0)); - assertThat(source.getValue("inference.processors._all.docs.max"), equalTo(100)); - assertThat(source.getValue("inference.processors._all.time.sum"), equalTo(14)); - assertThat(source.getValue("inference.processors._all.time.min"), equalTo(0)); - assertThat(source.getValue("inference.processors._all.time.max"), equalTo(10)); - assertThat(source.getValue("inference.processors._all.failures.sum"), equalTo(1)); - assertThat(source.getValue("inference.processors._all.failures.min"), equalTo(0)); - assertThat(source.getValue("inference.processors._all.failures.max"), equalTo(1)); + assertThat(source.getValue("inference.ingest_processors._all.pipelines.count"), equalTo(2)); + assertThat(source.getValue("inference.ingest_processors._all.num_docs_processed.sum"), equalTo(130)); + assertThat(source.getValue("inference.ingest_processors._all.num_docs_processed.min"), equalTo(0)); + assertThat(source.getValue("inference.ingest_processors._all.num_docs_processed.max"), equalTo(100)); + assertThat(source.getValue("inference.ingest_processors._all.time_ms.sum"), equalTo(14)); + assertThat(source.getValue("inference.ingest_processors._all.time_ms.min"), equalTo(0)); + assertThat(source.getValue("inference.ingest_processors._all.time_ms.max"), equalTo(10)); + assertThat(source.getValue("inference.ingest_processors._all.num_failures.sum"), equalTo(1)); + assertThat(source.getValue("inference.ingest_processors._all.num_failures.min"), equalTo(0)); + assertThat(source.getValue("inference.ingest_processors._all.num_failures.max"), equalTo(1)); } }