Skip to content

Commit

Permalink
refactor metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
chengxy committed Jul 18, 2023
1 parent 9d655a0 commit 8af194e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -61,19 +63,35 @@ public void run() {
List<ConnectRecord> pullRecordList = Lists.newArrayList();
try {
pullRecordList = Optional.ofNullable(eventSubscriber.pull()).orElse(new ArrayList<>());
metricsManager.eventbusInEventsTotal(pullRecordList.size());

for (ConnectRecord connectRecord : pullRecordList) {
String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
String accountId = getAccountId(connectRecord);
metricsManager.eventbusInEventsTotal(runnerName, accountId, "success", 1);
}
if (CollectionUtils.isEmpty(pullRecordList)) {
this.waitForRunning(1000);
continue;
}
circulatorContext.offerEventRecords(pullRecordList);
} catch (Exception exception) {
logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception);
pullRecordList.forEach(pullRecord -> errorHandler.handle(pullRecord, exception));
pullRecordList.forEach(pullRecord -> {
String runnerName = pullRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
String accountId = getAccountId(pullRecord);
metricsManager.eventbusInEventsTotal(runnerName, accountId, "failed", 1);
errorHandler.handle(pullRecord, exception);
});
}
}
}

public String getAccountId(ConnectRecord connectRecord) {
String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName);
return runnerConfig.getAccountId();
}

@Override
public String getServiceName() {
return EventBusListener.class.getSimpleName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.PostConstruct;
import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -73,7 +76,6 @@ public void run() {
List<ConnectRecord> afterTransformConnect= Lists.newArrayList();
while (!stopped) {
try {
long startTime = System.currentTimeMillis();
Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize);
if (MapUtils.isEmpty(eventRecordMap)) {
logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis());
Expand All @@ -90,31 +92,34 @@ public void run() {
afterTransformConnect.clear();
List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();
for (String runnerName : eventRecordMap.keySet()) {
TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName);
TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName);
List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
curEventRecords.forEach(pullRecord -> {
long startTime = System.currentTimeMillis();
CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
.exceptionally((exception) -> {
logger.error("transfer do transform event record failed,stackTrace-", exception);
//failed
metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), "failed", System.currentTimeMillis() - startTime);
errorHandler.handle(pullRecord, exception);
return null;
})
.thenAccept(pushRecord -> {
if (Objects.nonNull(pushRecord)) {
afterTransformConnect.add(pushRecord);
// success
metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), "success",System.currentTimeMillis() - startTime);
} else {
offsetManager.commit(pullRecord);
}
});
completableFutures.add(transformFuture);
});
}
long endTime = System.currentTimeMillis();
long latency = endTime - startTime;
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get();
circulatorContext.offerTargetTaskQueue(afterTransformConnect);
//success
metricsManager.eventRuleLatencySeconds(latency);
logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
} catch (Exception exception) {
//failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,15 @@ private void gaugeMetrics(String metricsName, long count, AttributesBuilder attr
});
}

public void eventbusInEventsTotal(long count) {
AttributesBuilder attributesBuilder = addGroup(ACCOUNT_LABELS);
public void eventbusInEventsTotal(String runnerName, String accountId, String status, long count) {
Map<String, String> labelMaps = buildLabelMap(runnerName, accountId, status);
AttributesBuilder attributesBuilder = addGroup(labelMaps);
countMetrics(EVENTBUS_IN_EVENTS_TOTAL, count, attributesBuilder);
}

public void eventRuleLatencySeconds(long latency) {
AttributesBuilder attributesBuilder = addGroup(RUNNER_NAME_LABELS);
public void eventRuleLatencySeconds(String runnerName, String accountId ,String status, long latency) {
Map<String, String> labelMaps = buildLabelMap(runnerName, accountId, status);
AttributesBuilder attributesBuilder = addGroup(labelMaps);
gaugeMetrics(EVENTRULE_LATENCY_SECONDS, latency, attributesBuilder);
}

Expand Down Expand Up @@ -328,4 +330,20 @@ public void shutdown() {
loggingMetricExporter.shutdown();
}
}

private Map<String, String> buildLabelMap(String runnerName, String accountId, String status) {
Map<String, String> labelMap = new HashMap<>();
if (StringUtils.isNotBlank(runnerName)) {
labelMap.put("runner_name", runnerName);
}

if (StringUtils.isNotBlank(accountId)) {

labelMap.put("account_id", accountId);
}
if (StringUtils.isNotBlank(status)) {
labelMap.put("status", status);
}
return labelMap;
}
}

0 comments on commit 8af194e

Please sign in to comment.