diff --git a/docs/development/extensions-contrib/prometheus.md b/docs/development/extensions-contrib/prometheus.md index 21d58b281435..b6f71b7a8f75 100644 --- a/docs/development/extensions-contrib/prometheus.md +++ b/docs/development/extensions-contrib/prometheus.md @@ -29,8 +29,7 @@ To use this Apache Druid extension, [include](../../configuration/extensions.md# This extension exposes [Druid metrics](https://druid.apache.org/docs/latest/operations/metrics.html) for collection by a Prometheus server (https://prometheus.io/). -Emitter is enabled by setting `druid.emitter=prometheus` [configs](https://druid.apache.org/docs/latest/configuration/index.html#enabling-metrics) or include `prometheus` in the composing emitter list. - +Emitter is enabled by setting `druid.emitter=prometheus` [configs](https://druid.apache.org/docs/latest/configuration/index.html#enabling-metrics) or include `prometheus` in the composing emitter list. ## Configuration @@ -47,6 +46,8 @@ All the configuration parameters for the Prometheus emitter are under `druid.emi | `druid.emitter.prometheus.pushGatewayAddress` | Pushgateway address. Required if using `pushgateway` strategy. | no | none | | `druid.emitter.prometheus.flushPeriod` | Emit metrics to Pushgateway every `flushPeriod` seconds. Required if `pushgateway` strategy is used. | no | 15 | | `druid.emitter.prometheus.extraLabels` | JSON key-value pairs for additional labels on all metrics. Keys (label names) must match the regex `[a-zA-Z_:][a-zA-Z0-9_:]*`. Example: `{"cluster_name": "druid_cluster1", "env": "staging"}`. | no | none | +| `druid.emitter.prometheus.deletePushGatewayMetricsOnShutdown` | Flag to delete metrics from Pushgateway on task shutdown. Works only if `pushgateway` strategy is used. This feature allows to delete a stale metrics from batch executed tasks. Otherwise, the Pushgateway will store these stale metrics indefinitely as there is [no time to live mechanism](https://github.com/prometheus/pushgateway/issues/117), using the memory to hold data that was already scraped by Prometheus. | no | false | +| `druid.emitter.prometheus.waitForShutdownDelay` | Time in milliseconds to wait for peon tasks to delete metrics from the Pushgateway on shutdown (e.g. 60_000). Applicable only when `pushgateway` strategy is used and `pushGatewayDeleteOnShutdown` is set to true. Be aware that there's no guarantee that a peon task will delete metrics from the gateway if the configured delay is more than the [Peon's `druid.indexer.task.gracefulShutdownTimeout`](https://druid.apache.org/docs/latest/configuration/#additional-peon-configuration). It is recommended that this value is 1.2 times the configured Prometheus `scrape_interval` of Pushgateway. This ensures, that the metrics should be scraped before the cleanup. | no | none | ### Ports for colocated Druid processes @@ -110,5 +111,5 @@ the service name. For example: "druid/coordinator-segment/count" : { "dimensions" : ["dataSource"], "type" : "gauge" }, "druid/historical-segment/count" : { "dimensions" : ["dataSource", "tier", "priority"], "type" : "gauge" } ``` - + For most use cases, the default mapping is sufficient. diff --git a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java index 9abff464432a..4426ac30a967 100644 --- a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java +++ b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitter.java @@ -69,7 +69,13 @@ public PrometheusEmitter(PrometheusEmitterConfig config) { this.config = config; this.strategy = config.getStrategy(); - metrics = new Metrics(config.getNamespace(), config.getDimensionMapPath(), config.isAddHostAsLabel(), config.isAddServiceAsLabel(), config.getExtraLabels()); + metrics = new Metrics( + config.getNamespace(), + config.getDimensionMapPath(), + config.isAddHostAsLabel(), + config.isAddServiceAsLabel(), + config.getExtraLabels() + ); } @@ -164,7 +170,8 @@ private void emitMetric(ServiceMetricEvent metricEvent) } else if (metric.getCollector() instanceof Gauge) { ((Gauge) metric.getCollector()).labels(labelValues).set(value.doubleValue()); } else if (metric.getCollector() instanceof Histogram) { - ((Histogram) metric.getCollector()).labels(labelValues).observe(value.doubleValue() / metric.getConversionFactor()); + ((Histogram) metric.getCollector()).labels(labelValues) + .observe(value.doubleValue() / metric.getConversionFactor()); } else { log.error("Unrecognized metric type [%s]", metric.getCollector().getClass()); } @@ -202,11 +209,36 @@ public void close() { if (strategy.equals(PrometheusEmitterConfig.Strategy.exporter)) { if (server != null) { - server.stop(); + server.close(); } } else { exec.shutdownNow(); flush(); + + try { + if (config.getWaitForShutdownDelay().getMillis() > 0) { + log.info("Waiting [%s]ms before deleting metrics from the push gateway.", config.getWaitForShutdownDelay().getMillis()); + Thread.sleep(config.getWaitForShutdownDelay().getMillis()); + } + } + catch (InterruptedException e) { + log.error(e, "Interrupted while waiting for shutdown delay. Deleting metrics from the push gateway now."); + } + finally { + deletePushGatewayMetrics(); + } + } + } + + private void deletePushGatewayMetrics() + { + if (pushGateway != null && config.isDeletePushGatewayMetricsOnShutdown()) { + try { + pushGateway.delete(config.getNamespace(), ImmutableMap.of(config.getNamespace(), identifier)); + } + catch (IOException e) { + log.error(e, "Unable to delete prometheus metrics from push gateway"); + } } } diff --git a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java index 118eb03a1de6..bddea2a38488 100644 --- a/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java +++ b/extensions-contrib/prometheus-emitter/src/main/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfig.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.StringUtils; +import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.Collections; @@ -70,6 +71,12 @@ public class PrometheusEmitterConfig @JsonProperty private final Map extraLabels; + @JsonProperty + private final boolean deletePushGatewayMetricsOnShutdown; + + @JsonProperty + private final Duration waitForShutdownDelay; + @JsonCreator public PrometheusEmitterConfig( @JsonProperty("strategy") @Nullable Strategy strategy, @@ -80,7 +87,9 @@ public PrometheusEmitterConfig( @JsonProperty("addHostAsLabel") boolean addHostAsLabel, @JsonProperty("addServiceAsLabel") boolean addServiceAsLabel, @JsonProperty("flushPeriod") Integer flushPeriod, - @JsonProperty("extraLabels") @Nullable Map extraLabels + @JsonProperty("extraLabels") @Nullable Map extraLabels, + @JsonProperty("deletePushGatewayMetricsOnShutdown") @Nullable Boolean deletePushGatewayMetricsOnShutdown, + @JsonProperty("waitForShutdownDelay") @Nullable Long waitForShutdownDelay ) { this.strategy = strategy != null ? strategy : Strategy.exporter; @@ -103,6 +112,23 @@ public PrometheusEmitterConfig( this.addHostAsLabel = addHostAsLabel; this.addServiceAsLabel = addServiceAsLabel; this.extraLabels = extraLabels != null ? extraLabels : Collections.emptyMap(); + this.deletePushGatewayMetricsOnShutdown = deletePushGatewayMetricsOnShutdown != null && deletePushGatewayMetricsOnShutdown; + + if (waitForShutdownDelay == null) { + this.waitForShutdownDelay = Duration.ZERO; + } else if (waitForShutdownDelay >= 0) { + this.waitForShutdownDelay = Duration.millis(waitForShutdownDelay); + } else { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + StringUtils.format( + "Invalid value for waitForShutdownDelay[%s] specified, waitForShutdownDelay must be >= 0.", + waitForShutdownDelay + ) + ); + } + // Validate label names early to prevent Prometheus exceptions later. for (String key : this.extraLabels.keySet()) { if (!PATTERN.matcher(key).matches()) { @@ -165,6 +191,16 @@ public Map getExtraLabels() return extraLabels; } + public boolean isDeletePushGatewayMetricsOnShutdown() + { + return deletePushGatewayMetricsOnShutdown; + } + + public Duration getWaitForShutdownDelay() + { + return waitForShutdownDelay; + } + public enum Strategy { exporter, pushgateway diff --git a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfigTest.java b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfigTest.java index f7c85ad471f2..fcb849c9762a 100644 --- a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfigTest.java +++ b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterConfigTest.java @@ -39,7 +39,7 @@ public void testEmitterConfigWithBadExtraLabels() // Expect an exception thrown by our own PrometheusEmitterConfig due to invalid label key Exception exception = Assert.assertThrows(DruidException.class, () -> { - new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, extraLabels); + new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, extraLabels, false, null); }); String expectedMessage = "Invalid metric label name [label Name]. Label names must conform to the pattern [[a-zA-Z_:][a-zA-Z0-9_:]*]"; diff --git a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java index ca1f63ba34c2..1e9c49346362 100644 --- a/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java +++ b/extensions-contrib/prometheus-emitter/src/test/java/org/apache/druid/emitter/prometheus/PrometheusEmitterTest.java @@ -25,6 +25,7 @@ import io.prometheus.client.exporter.PushGateway; import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -34,6 +35,7 @@ import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.mock; public class PrometheusEmitterTest @@ -42,7 +44,7 @@ public class PrometheusEmitterTest public void testEmitterWithServiceLabel() { CollectorRegistry.defaultRegistry.clear(); - PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, null); + PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, null, false, null); PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); Emitter emitter = prometheusEmitterModule.getEmitter(config); ServiceMetricEvent build = ServiceMetricEvent.builder() @@ -63,7 +65,7 @@ public void testEmitterWithServiceLabel() public void testEmitterWithServiceAndHostLabel() { CollectorRegistry.defaultRegistry.clear(); - PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, true, true, 60, null); + PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, true, true, 60, null, false, null); PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); Emitter emitter = prometheusEmitterModule.getEmitter(config); ServiceMetricEvent build = ServiceMetricEvent.builder() @@ -86,7 +88,7 @@ public void testEmitterWithExtraLabels() CollectorRegistry.defaultRegistry.clear(); Map extraLabels = new HashMap<>(); extraLabels.put("labelName", "labelValue"); - PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, false, 60, extraLabels); + PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, false, 60, extraLabels, false, null); PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); Emitter emitter = prometheusEmitterModule.getEmitter(config); ServiceMetricEvent build = ServiceMetricEvent.builder() @@ -108,7 +110,7 @@ public void testEmitterWithServiceLabelAndExtraLabel() CollectorRegistry.defaultRegistry.clear(); Map extraLabels = new HashMap<>(); extraLabels.put("labelName", "labelValue"); - PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, extraLabels); + PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, extraLabels, false, null); PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); Emitter emitter = prometheusEmitterModule.getEmitter(config); ServiceMetricEvent build = ServiceMetricEvent.builder() @@ -129,7 +131,7 @@ public void testEmitterWithEmptyExtraLabels() { CollectorRegistry.defaultRegistry.clear(); Map extraLabels = new HashMap<>(); - PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, extraLabels); + PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, extraLabels, false, null); PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); Emitter emitter = prometheusEmitterModule.getEmitter(config); ServiceMetricEvent build = ServiceMetricEvent.builder() @@ -152,7 +154,7 @@ public void testEmitterWithMultipleExtraLabels() Map extraLabels = new HashMap<>(); extraLabels.put("labelName1", "labelValue1"); extraLabels.put("labelName2", "labelValue2"); - PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, extraLabels); + PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, extraLabels, false, null); PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); Emitter emitter = prometheusEmitterModule.getEmitter(config); ServiceMetricEvent build = ServiceMetricEvent.builder() @@ -175,7 +177,7 @@ public void testEmitterWithLabelCollision() // ExtraLabels contains a label that collides with a service label Map extraLabels = new HashMap<>(); extraLabels.put("server", "collisionLabelValue"); - PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, extraLabels); + PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, null, null, 0, null, false, true, 60, extraLabels, false, null); PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); Emitter emitter = prometheusEmitterModule.getEmitter(config); ServiceMetricEvent build = ServiceMetricEvent.builder() @@ -202,7 +204,7 @@ public void testEmitterWithLabelCollision() public void testEmitterMetric() { CollectorRegistry.defaultRegistry.clear(); - PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace", null, 0, "pushgateway", true, true, 60, null); + PrometheusEmitterConfig config = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace", null, 0, "pushgateway", true, true, 60, null, false, null); PrometheusEmitterModule prometheusEmitterModule = new PrometheusEmitterModule(); Emitter emitter = prometheusEmitterModule.getEmitter(config); ServiceMetricEvent build = ServiceMetricEvent.builder() @@ -223,12 +225,12 @@ public void testEmitterMetric() @Test public void testEmitterStart() { - PrometheusEmitterConfig exportEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "namespace1", null, 0, null, true, true, 60, null); + PrometheusEmitterConfig exportEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.exporter, "namespace1", null, 0, null, true, true, 60, null, false, null); PrometheusEmitter exportEmitter = new PrometheusEmitter(exportEmitterConfig); exportEmitter.start(); Assert.assertNotNull(exportEmitter.getServer()); - PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace2", null, 0, "pushgateway", true, true, 60, null); + PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace2", null, 0, "pushgateway", true, true, 60, null, false, null); PrometheusEmitter pushEmitter = new PrometheusEmitter(pushEmitterConfig); pushEmitter.start(); Assert.assertNotNull(pushEmitter.getPushGateway()); @@ -237,7 +239,7 @@ public void testEmitterStart() @Test public void testEmitterPush() throws IOException { - PrometheusEmitterConfig emitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace3", null, 0, "pushgateway", true, true, 60, null); + PrometheusEmitterConfig emitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace3", null, 0, "pushgateway", true, true, 60, null, false, null); PushGateway mockPushGateway = mock(PushGateway.class); mockPushGateway.push(anyObject(Collector.class), anyString(), anyObject(ImmutableMap.class)); @@ -266,6 +268,8 @@ public void testEmitterConfigCreationWithNullAsAddress() true, true, 60, + null, + false, null ); @@ -281,6 +285,8 @@ public void testEmitterConfigCreationWithNullAsAddress() true, true, 50, + null, + false, null ) ); @@ -289,7 +295,7 @@ public void testEmitterConfigCreationWithNullAsAddress() @Test public void testEmitterStartWithHttpUrl() { - PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace4", null, 0, "http://pushgateway", true, true, 60, null); + PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace4", null, 0, "http://pushgateway", true, true, 60, null, false, null); PrometheusEmitter pushEmitter = new PrometheusEmitter(pushEmitterConfig); pushEmitter.start(); Assert.assertNotNull(pushEmitter.getPushGateway()); @@ -298,7 +304,7 @@ public void testEmitterStartWithHttpUrl() @Test public void testEmitterStartWithHttpsUrl() { - PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace5", null, 0, "https://pushgateway", true, true, 60, null); + PrometheusEmitterConfig pushEmitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace5", null, 0, "https://pushgateway", true, true, 60, null, false, null); PrometheusEmitter pushEmitter = new PrometheusEmitter(pushEmitterConfig); pushEmitter.start(); Assert.assertNotNull(pushEmitter.getPushGateway()); @@ -319,6 +325,8 @@ public void testEmitterConfig() true, true, 60, + null, + false, null ) ); @@ -333,7 +341,88 @@ public void testEmitterConfig() true, true, 60, + null, + false, null ); } + + @Test + public void testEmitterWithDeleteOnShutdown() throws IOException + { + PrometheusEmitterConfig emitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace3", null, 0, "pushgateway", true, true, 60, null, true, null); + + PushGateway mockPushGateway = mock(PushGateway.class); + mockPushGateway.push(anyObject(CollectorRegistry.class), anyString(), anyObject(ImmutableMap.class)); + expectLastCall().atLeastOnce(); + mockPushGateway.delete(anyString(), anyObject(ImmutableMap.class)); + expectLastCall().once(); + + EasyMock.replay(mockPushGateway); + + PrometheusEmitter emitter = new PrometheusEmitter(emitterConfig); + emitter.start(); + emitter.setPushGateway(mockPushGateway); + ServiceMetricEvent build = ServiceMetricEvent.builder() + .setDimension("task", "index_parallel") + .setMetric("task/run/time", 500) + .build(ImmutableMap.of("service", "peon", "host", "druid.test.cn")); + emitter.emit(build); + emitter.flush(); + emitter.close(); + + EasyMock.verify(mockPushGateway); + } + + @Test + public void testEmitterWithDeleteOnShutdownAndWait() throws IOException + { + PrometheusEmitterConfig emitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace6", null, 0, "pushgateway", true, true, 60, null, true, 1_000L); + + PushGateway mockPushGateway = mock(PushGateway.class); + mockPushGateway.push(anyObject(CollectorRegistry.class), anyString(), anyObject(ImmutableMap.class)); + expectLastCall().atLeastOnce(); + mockPushGateway.delete(anyString(), anyObject(ImmutableMap.class)); + expectLastCall().once(); + + EasyMock.replay(mockPushGateway); + + PrometheusEmitter emitter = new PrometheusEmitter(emitterConfig); + emitter.start(); + emitter.setPushGateway(mockPushGateway); + ServiceMetricEvent build = ServiceMetricEvent.builder() + .setDimension("task", "index_parallel") + .setMetric("task/run/time", 500) + .build(ImmutableMap.of("service", "peon", "host", "druid.test.cn")); + emitter.emit(build); + emitter.flush(); + emitter.close(); + + EasyMock.verify(mockPushGateway); + } + + @Test + public void testEmitterWithoutDeleteOnShutdown() throws IOException + { + PrometheusEmitterConfig emitterConfig = new PrometheusEmitterConfig(PrometheusEmitterConfig.Strategy.pushgateway, "namespace3", null, 0, "pushgateway", true, true, 60, null, false, null); + + PushGateway mockPushGateway = mock(PushGateway.class); + mockPushGateway.push(anyObject(CollectorRegistry.class), anyString(), anyObject(ImmutableMap.class)); + expectLastCall().atLeastOnce(); + + EasyMock.replay(mockPushGateway); + + PrometheusEmitter emitter = new PrometheusEmitter(emitterConfig); + emitter.start(); + emitter.setPushGateway(mockPushGateway); + ServiceMetricEvent build = ServiceMetricEvent.builder() + .setDimension("task", "index_parallel") + .setMetric("task/run/time", 500) + .build(ImmutableMap.of("service", "peon", "host", "druid.test.cn")); + emitter.emit(build); + emitter.flush(); + emitter.close(); + + EasyMock.verify(mockPushGateway); + } }