From 9d655a0de72e8dd7943b8c92841945aa72f39a4e Mon Sep 17 00:00:00 2001 From: chengxy Date: Tue, 18 Jul 2023 11:39:57 +0800 Subject: [PATCH] add test metrics --- .../runtime/boot/EventBusListener.java | 2 +- .../runtime/boot/EventRuleTransfer.java | 6 ++ .../eventbridge/metrics/NopLongCounter.java | 16 +---- .../eventbridge/BridgeMetricsConstant.java | 25 ++++--- .../eventbridge/BridgeMetricsManager.java | 66 ++++++++----------- 5 files changed, 53 insertions(+), 62 deletions(-) diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java index 3b2fb45c..ee86b70c 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java @@ -61,7 +61,7 @@ public void run() { List pullRecordList = Lists.newArrayList(); try { pullRecordList = Optional.ofNullable(eventSubscriber.pull()).orElse(new ArrayList<>()); - BridgeMetricsManager.messagesInTotal.add(pullRecordList.size()); + metricsManager.eventbusInEventsTotal(pullRecordList.size()); if (CollectionUtils.isEmpty(pullRecordList)) { this.waitForRunning(1000); continue; diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index 2621c14d..2939dc66 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -73,6 +73,7 @@ public void run() { List afterTransformConnect= Lists.newArrayList(); while (!stopped) { try { + long startTime = System.currentTimeMillis(); Map> eventRecordMap = circulatorContext.takeEventRecords(batchSize); if (MapUtils.isEmpty(eventRecordMap)) { logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis()); @@ -108,10 +109,15 @@ public void run() { completableFutures.add(transformFuture); }); } + long endTime = System.currentTimeMillis(); + long latency = endTime - startTime; CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get(); circulatorContext.offerTargetTaskQueue(afterTransformConnect); + //success + metricsManager.eventRuleLatencySeconds(latency); logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect)); } catch (Exception exception) { + //failed logger.error("transfer event record failed, stackTrace-", exception); afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception)); } diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java index fe84f98a..536610bc 100644 --- a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java @@ -17,20 +17,8 @@ package org.apache.rocketmq.eventbridge.metrics; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.context.Context; +import io.opentelemetry.api.metrics.ObservableLongCounter; -public class NopLongCounter implements LongCounter { - @Override public void add(long l) { +public class NopLongCounter implements ObservableLongCounter { - } - - @Override public void add(long l, Attributes attributes) { - - } - - @Override public void add(long l, Attributes attributes, Context context) { - - } } diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java index 1a3c1c27..86536968 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java @@ -17,19 +17,28 @@ package org.apache.rocketmq.eventbridge; +import java.util.HashMap; +import java.util.Map; + public class BridgeMetricsConstant { public static final String OPEN_TELEMETRY_METER_NAME = "bridge-meter"; - - public static final String GAUGE_PROCESSOR_GAUGE = "target_queue_gauge"; - public static final String RULE_QUEUE_GAUGE = "rule_queue_gauge"; - - public static final String COUNTER_MESSAGES_IN_TOTAL = "eventbridge_messages_in_total"; - public static final String COUNTER_MESSAGES_OUT_TOTAL = "eventbridge_messages_out_total"; - public static final String COUNTER_THROUGHPUT_IN_TOTAL = "eventbridge_throughput_in_total"; - public static final String COUNTER_THROUGHPUT_OUT_TOTAL = "eventbridge_throughput_out_total"; public static final String HISTOGRAM_MESSAGE_SIZE = "eventbridge_message_size"; + public static final String EVENTBUS_IN_EVENTS_TOTAL = "eventbridge_eventbus_in_events_total"; + public static final String EVENTRULE_LATENCY_SECONDS = "eventbridge_eventrule_latency_seconds"; /** eventbridge process message latency**/ public static final String HISTOGRAM_RPC_LATENCY = "process_latency"; + + public static final Map ACCOUNT_LABELS = new HashMap() { + { put("account_id", "account id"); } + { put("runnerName", "runnerName"); } + { put("status", "status"); } + }; + + public static final Map RUNNER_NAME_LABELS = new HashMap() { + { put("account_id", "account id"); } + { put("runnerName", "runner name"); } + }; + } diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java index 216a23d5..dc81663c 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java @@ -24,6 +24,7 @@ import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongCounter; import io.opentelemetry.api.metrics.ObservableLongGauge; import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; @@ -66,7 +67,6 @@ public class BridgeMetricsManager { private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMetricsManager.class); private BridgeConfig bridgeConfig; - private final static Map LABEL_MAP = new HashMap<>(); private OtlpGrpcMetricExporter metricExporter; private PeriodicMetricReader periodicMetricReader; private PrometheusHttpServer prometheusHttpServer; @@ -75,17 +75,10 @@ public class BridgeMetricsManager { // queue stats metrics public static ObservableLongGauge targetGauge = new NopObservableLongGauge(); - public static ObservableLongGauge ruleGauge = new NopObservableLongGauge(); - - //invoke timeout public static LongHistogram invokeLatency = new NopLongHistogram(); - // request metrics - public static LongCounter messagesInTotal = new NopLongCounter(); - public static LongCounter messagesOutTotal = new NopLongCounter(); - public static LongCounter throughputInTotal = new NopLongCounter(); - public static LongCounter throughputOutTotal = new NopLongCounter(); + public static ObservableLongCounter messagesOutTotal = new NopLongCounter(); public static LongHistogram messageSize = new NopLongHistogram(); public BridgeMetricsManager() { @@ -109,10 +102,9 @@ private void initMetricsProperties() { } } - - public static AttributesBuilder newAttributesBuilder() { + public static AttributesBuilder newAttributesBuilder(Map labels) { AttributesBuilder attributesBuilder = Attributes.builder(); - LABEL_MAP.forEach(attributesBuilder::put); + labels.forEach(attributesBuilder::put); return attributesBuilder; } @@ -249,50 +241,46 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { } - public void initTriggerMetrics(List eventTargetQueue, String label) { + public AttributesBuilder addGroup(Map labels ) { + return newAttributesBuilder(labels); + } - targetGauge = bridgeMeter.gaugeBuilder(GAUGE_PROCESSOR_GAUGE) - .setDescription("Request processor gauge") - .ofLongs() + private void countMetrics(String metricsName, long count, AttributesBuilder attributesBuilder) { + messagesOutTotal = bridgeMeter.counterBuilder(metricsName) + .setDescription("Total number of outgoing messages") .buildWithCallback(measurement -> { - measurement.record(eventTargetQueue.size(), newAttributesBuilder().put(label, "target").build()); + measurement.record(count, attributesBuilder.build()); }); + } - public void initRuleMetrics(List eventRuleQueue, String label) { - ruleGauge = bridgeMeter.gaugeBuilder(RULE_QUEUE_GAUGE) - .setDescription("Request processor gauge") + private void gaugeMetrics(String metricsName, long count, AttributesBuilder attributesBuilder) { + targetGauge = bridgeMeter.gaugeBuilder(metricsName) + .setDescription("Gauge of total messages ") .ofLongs() - .buildWithCallback(measurement -> { - measurement.record(eventRuleQueue.size(), newAttributesBuilder().put(label, "rule").build()); + .buildWithCallback( measurement -> { + measurement.record(count, attributesBuilder.build()); }); - } - private void initRequestMetrics() { - messagesInTotal = bridgeMeter.counterBuilder(COUNTER_MESSAGES_IN_TOTAL) - .setDescription("Total number of incoming messages") - .build(); - - messagesOutTotal = bridgeMeter.counterBuilder(COUNTER_MESSAGES_OUT_TOTAL) - .setDescription("Total number of outgoing messages") - .build(); - - throughputInTotal = bridgeMeter.counterBuilder(COUNTER_THROUGHPUT_IN_TOTAL) - .setDescription("Total traffic of incoming messages") - .build(); + public void eventbusInEventsTotal(long count) { + AttributesBuilder attributesBuilder = addGroup(ACCOUNT_LABELS); + countMetrics(EVENTBUS_IN_EVENTS_TOTAL, count, attributesBuilder); + } - throughputOutTotal = bridgeMeter.counterBuilder(COUNTER_THROUGHPUT_OUT_TOTAL) - .setDescription("Total traffic of outgoing messages") - .build(); + public void eventRuleLatencySeconds(long latency) { + AttributesBuilder attributesBuilder = addGroup(RUNNER_NAME_LABELS); + gaugeMetrics(EVENTRULE_LATENCY_SECONDS, latency, attributesBuilder); + } + public void initRequestMetrics() { messageSize = bridgeMeter.histogramBuilder(HISTOGRAM_MESSAGE_SIZE) .setDescription("Incoming messages size") .ofLongs() .build(); } - public static void initRuleMetrics(Meter meter) { + public void initRuleMetrics(Meter meter) { invokeLatency = meter.histogramBuilder(HISTOGRAM_RPC_LATENCY) .setDescription("invoke latency") .setUnit("milliseconds")