diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java index ee86b70c..d7891dad 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java @@ -29,6 +29,8 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber; import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; +import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig; +import org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +63,12 @@ public void run() { List pullRecordList = Lists.newArrayList(); try { pullRecordList = Optional.ofNullable(eventSubscriber.pull()).orElse(new ArrayList<>()); - metricsManager.eventbusInEventsTotal(pullRecordList.size()); + + for (ConnectRecord connectRecord : pullRecordList) { + String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME); + String accountId = getAccountId(connectRecord); + metricsManager.eventbusInEventsTotal(runnerName, accountId, "success", 1); + } if (CollectionUtils.isEmpty(pullRecordList)) { this.waitForRunning(1000); continue; @@ -69,11 +76,22 @@ public void run() { circulatorContext.offerEventRecords(pullRecordList); } catch (Exception exception) { logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception); - pullRecordList.forEach(pullRecord -> errorHandler.handle(pullRecord, exception)); + pullRecordList.forEach(pullRecord -> { + String runnerName = pullRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME); + String accountId = getAccountId(pullRecord); + metricsManager.eventbusInEventsTotal(runnerName, accountId, "failed", 1); + errorHandler.handle(pullRecord, exception); + }); } } } + public String getAccountId(ConnectRecord connectRecord) { + String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME); + TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName); + return runnerConfig.getAccountId(); + } + @Override public String getServiceName() { return EventBusListener.class.getSimpleName(); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index 2939dc66..dda1f954 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + import javax.annotation.PostConstruct; import org.apache.commons.collections.MapUtils; import org.apache.rocketmq.eventbridge.BridgeMetricsManager; @@ -31,6 +33,7 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine; import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; +import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil; import org.slf4j.Logger; @@ -73,7 +76,6 @@ public void run() { List afterTransformConnect= Lists.newArrayList(); while (!stopped) { try { - long startTime = System.currentTimeMillis(); Map> eventRecordMap = circulatorContext.takeEventRecords(batchSize); if (MapUtils.isEmpty(eventRecordMap)) { logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis()); @@ -90,18 +92,24 @@ public void run() { afterTransformConnect.clear(); List> completableFutures = Lists.newArrayList(); for (String runnerName : eventRecordMap.keySet()) { + TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName); TransformEngine curTransformEngine = latestTransformMap.get(runnerName); List curEventRecords = eventRecordMap.get(runnerName); curEventRecords.forEach(pullRecord -> { + long startTime = System.currentTimeMillis(); CompletableFuture transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord)) .exceptionally((exception) -> { logger.error("transfer do transform event record failed,stackTrace-", exception); + //failed + metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), "failed", System.currentTimeMillis() - startTime); errorHandler.handle(pullRecord, exception); return null; }) .thenAccept(pushRecord -> { if (Objects.nonNull(pushRecord)) { afterTransformConnect.add(pushRecord); + // success + metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), "success",System.currentTimeMillis() - startTime); } else { offsetManager.commit(pullRecord); } @@ -109,12 +117,9 @@ public void run() { completableFutures.add(transformFuture); }); } - long endTime = System.currentTimeMillis(); - long latency = endTime - startTime; CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get(); circulatorContext.offerTargetTaskQueue(afterTransformConnect); //success - metricsManager.eventRuleLatencySeconds(latency); logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect)); } catch (Exception exception) { //failed diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java index dc81663c..2474f939 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java @@ -263,13 +263,15 @@ private void gaugeMetrics(String metricsName, long count, AttributesBuilder attr }); } - public void eventbusInEventsTotal(long count) { - AttributesBuilder attributesBuilder = addGroup(ACCOUNT_LABELS); + public void eventbusInEventsTotal(String runnerName, String accountId, String status, long count) { + Map labelMaps = buildLabelMap(runnerName, accountId, status); + AttributesBuilder attributesBuilder = addGroup(labelMaps); countMetrics(EVENTBUS_IN_EVENTS_TOTAL, count, attributesBuilder); } - public void eventRuleLatencySeconds(long latency) { - AttributesBuilder attributesBuilder = addGroup(RUNNER_NAME_LABELS); + public void eventRuleLatencySeconds(String runnerName, String accountId ,String status, long latency) { + Map labelMaps = buildLabelMap(runnerName, accountId, status); + AttributesBuilder attributesBuilder = addGroup(labelMaps); gaugeMetrics(EVENTRULE_LATENCY_SECONDS, latency, attributesBuilder); } @@ -328,4 +330,20 @@ public void shutdown() { loggingMetricExporter.shutdown(); } } + + private Map buildLabelMap(String runnerName, String accountId, String status) { + Map labelMap = new HashMap<>(); + if (StringUtils.isNotBlank(runnerName)) { + labelMap.put("runner_name", runnerName); + } + + if (StringUtils.isNotBlank(accountId)) { + + labelMap.put("account_id", accountId); + } + if (StringUtils.isNotBlank(status)) { + labelMap.put("status", status); + } + return labelMap; + } }