Skip to content

Commit

Permalink
add test metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
chengxy committed Jul 18, 2023
1 parent e641e27 commit 9d655a0
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void run() {
List<ConnectRecord> pullRecordList = Lists.newArrayList();
try {
pullRecordList = Optional.ofNullable(eventSubscriber.pull()).orElse(new ArrayList<>());
BridgeMetricsManager.messagesInTotal.add(pullRecordList.size());
metricsManager.eventbusInEventsTotal(pullRecordList.size());
if (CollectionUtils.isEmpty(pullRecordList)) {
this.waitForRunning(1000);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void run() {
List<ConnectRecord> afterTransformConnect= Lists.newArrayList();
while (!stopped) {
try {
long startTime = System.currentTimeMillis();
Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
if (MapUtils.isEmpty(eventRecordMap)) {
logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
Expand Down Expand Up @@ -108,10 +109,15 @@ public void run() {
completableFutures.add(transformFuture);
});
}
long endTime = System.currentTimeMillis();
long latency = endTime - startTime;
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get();
circulatorContext.offerTargetTaskQueue(afterTransformConnect);
//success
metricsManager.eventRuleLatencySeconds(latency);
logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
} catch (Exception exception) {
//failed
logger.error("transfer event record failed, stackTrace-", exception);
afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,8 @@

package org.apache.rocketmq.eventbridge.metrics;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.context.Context;
import io.opentelemetry.api.metrics.ObservableLongCounter;

public class NopLongCounter implements LongCounter {
@Override public void add(long l) {
public class NopLongCounter implements ObservableLongCounter {

}

@Override public void add(long l, Attributes attributes) {

}

@Override public void add(long l, Attributes attributes, Context context) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,28 @@

package org.apache.rocketmq.eventbridge;

import java.util.HashMap;
import java.util.Map;

public class BridgeMetricsConstant {
public static final String OPEN_TELEMETRY_METER_NAME = "bridge-meter";

public static final String GAUGE_PROCESSOR_GAUGE = "target_queue_gauge";
public static final String RULE_QUEUE_GAUGE = "rule_queue_gauge";

public static final String COUNTER_MESSAGES_IN_TOTAL = "eventbridge_messages_in_total";
public static final String COUNTER_MESSAGES_OUT_TOTAL = "eventbridge_messages_out_total";
public static final String COUNTER_THROUGHPUT_IN_TOTAL = "eventbridge_throughput_in_total";
public static final String COUNTER_THROUGHPUT_OUT_TOTAL = "eventbridge_throughput_out_total";
public static final String HISTOGRAM_MESSAGE_SIZE = "eventbridge_message_size";
public static final String EVENTBUS_IN_EVENTS_TOTAL = "eventbridge_eventbus_in_events_total";
public static final String EVENTRULE_LATENCY_SECONDS = "eventbridge_eventrule_latency_seconds";


/** eventbridge process message latency**/
public static final String HISTOGRAM_RPC_LATENCY = "process_latency";

public static final Map<String, String> ACCOUNT_LABELS = new HashMap<String, String>() {
{ put("account_id", "account id"); }
{ put("runnerName", "runnerName"); }
{ put("status", "status"); }
};

public static final Map<String, String> RUNNER_NAME_LABELS = new HashMap<String, String>() {
{ put("account_id", "account id"); }
{ put("runnerName", "runner name"); }
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
Expand Down Expand Up @@ -66,7 +67,6 @@ public class BridgeMetricsManager {
private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMetricsManager.class);

private BridgeConfig bridgeConfig;
private final static Map<String, String> LABEL_MAP = new HashMap<>();
private OtlpGrpcMetricExporter metricExporter;
private PeriodicMetricReader periodicMetricReader;
private PrometheusHttpServer prometheusHttpServer;
Expand All @@ -75,17 +75,10 @@ public class BridgeMetricsManager {

// queue stats metrics
public static ObservableLongGauge targetGauge = new NopObservableLongGauge();
public static ObservableLongGauge ruleGauge = new NopObservableLongGauge();


//invoke timeout
public static LongHistogram invokeLatency = new NopLongHistogram();

// request metrics
public static LongCounter messagesInTotal = new NopLongCounter();
public static LongCounter messagesOutTotal = new NopLongCounter();
public static LongCounter throughputInTotal = new NopLongCounter();
public static LongCounter throughputOutTotal = new NopLongCounter();
public static ObservableLongCounter messagesOutTotal = new NopLongCounter();
public static LongHistogram messageSize = new NopLongHistogram();

public BridgeMetricsManager() {
Expand All @@ -109,10 +102,9 @@ private void initMetricsProperties() {
}
}


public static AttributesBuilder newAttributesBuilder() {
public static AttributesBuilder newAttributesBuilder(Map<String, String> labels) {
AttributesBuilder attributesBuilder = Attributes.builder();
LABEL_MAP.forEach(attributesBuilder::put);
labels.forEach(attributesBuilder::put);
return attributesBuilder;
}

Expand Down Expand Up @@ -249,50 +241,46 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
}


public <T> void initTriggerMetrics(List<T> eventTargetQueue, String label) {
public AttributesBuilder addGroup(Map<String, String> labels ) {
return newAttributesBuilder(labels);
}

targetGauge = bridgeMeter.gaugeBuilder(GAUGE_PROCESSOR_GAUGE)
.setDescription("Request processor gauge")
.ofLongs()
private void countMetrics(String metricsName, long count, AttributesBuilder attributesBuilder) {
messagesOutTotal = bridgeMeter.counterBuilder(metricsName)
.setDescription("Total number of outgoing messages")
.buildWithCallback(measurement -> {
measurement.record(eventTargetQueue.size(), newAttributesBuilder().put(label, "target").build());
measurement.record(count, attributesBuilder.build());
});

}

public <T> void initRuleMetrics(List<T> eventRuleQueue, String label) {
ruleGauge = bridgeMeter.gaugeBuilder(RULE_QUEUE_GAUGE)
.setDescription("Request processor gauge")
private void gaugeMetrics(String metricsName, long count, AttributesBuilder attributesBuilder) {
targetGauge = bridgeMeter.gaugeBuilder(metricsName)
.setDescription("Gauge of total messages ")
.ofLongs()
.buildWithCallback(measurement -> {
measurement.record(eventRuleQueue.size(), newAttributesBuilder().put(label, "rule").build());
.buildWithCallback( measurement -> {
measurement.record(count, attributesBuilder.build());
});

}

private void initRequestMetrics() {
messagesInTotal = bridgeMeter.counterBuilder(COUNTER_MESSAGES_IN_TOTAL)
.setDescription("Total number of incoming messages")
.build();

messagesOutTotal = bridgeMeter.counterBuilder(COUNTER_MESSAGES_OUT_TOTAL)
.setDescription("Total number of outgoing messages")
.build();

throughputInTotal = bridgeMeter.counterBuilder(COUNTER_THROUGHPUT_IN_TOTAL)
.setDescription("Total traffic of incoming messages")
.build();
public void eventbusInEventsTotal(long count) {
AttributesBuilder attributesBuilder = addGroup(ACCOUNT_LABELS);
countMetrics(EVENTBUS_IN_EVENTS_TOTAL, count, attributesBuilder);
}

throughputOutTotal = bridgeMeter.counterBuilder(COUNTER_THROUGHPUT_OUT_TOTAL)
.setDescription("Total traffic of outgoing messages")
.build();
public void eventRuleLatencySeconds(long latency) {
AttributesBuilder attributesBuilder = addGroup(RUNNER_NAME_LABELS);
gaugeMetrics(EVENTRULE_LATENCY_SECONDS, latency, attributesBuilder);
}

public void initRequestMetrics() {
messageSize = bridgeMeter.histogramBuilder(HISTOGRAM_MESSAGE_SIZE)
.setDescription("Incoming messages size")
.ofLongs()
.build();
}

public static void initRuleMetrics(Meter meter) {
public void initRuleMetrics(Meter meter) {
invokeLatency = meter.histogramBuilder(HISTOGRAM_RPC_LATENCY)
.setDescription("invoke latency")
.setUnit("milliseconds")
Expand Down

0 comments on commit 9d655a0

Please sign in to comment.