Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics Count Optimization #11529

Merged
merged 14 commits into from
Feb 13, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.dubbo.config.deploy;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.config.Environment;
import org.apache.dubbo.common.config.ReferenceCache;
Expand Down Expand Up @@ -365,8 +364,7 @@ private void initMetricsReporter() {
// TODO compatible with old usage of metrics, remove protocol check after new metrics is ready for use.
if (metricsConfig != null && PROTOCOL_PROMETHEUS.equals(metricsConfig.getProtocol())) {
collector.setCollectEnabled(true);
collector.addApplicationInfo(applicationModel.getApplicationName(), Version.getVersion());
collector.addThreadPool(applicationModel.getFrameworkModel(), applicationModel.getApplicationName());
collector.collectApplication(applicationModel);
String protocol = metricsConfig.getProtocol();
if (StringUtils.isNotEmpty(protocol)) {
MetricsReporterFactory metricsReporterFactory = getExtensionLoader(MetricsReporterFactory.class).getAdaptiveExtension();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,6 @@ public enum Type {
UNKNOWN_FAILED,
TOTAL_FAILED,
APPLICATION_INFO

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import static org.apache.dubbo.common.utils.NetUtils.getLocalHost;
import static org.apache.dubbo.common.utils.NetUtils.getLocalHostName;

public class ThreadPoolMetric {
public class ThreadPoolMetric implements Metric{

private String applicationName;

Expand Down Expand Up @@ -91,7 +91,6 @@ public Map<String, String> getTags() {
tags.put(TAG_PID, ConfigUtils.getPid()+"");
tags.put(TAG_HOSTNAME, getLocalHostName());
tags.put(TAG_APPLICATION_NAME, applicationName);

tags.put(TAG_THREAD_NAME, threadPoolName);
return tags;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.dubbo.metrics.collector;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.config.MetricsConfig;
import org.apache.dubbo.config.context.ConfigManager;
Expand All @@ -37,6 +33,11 @@
import org.apache.dubbo.metrics.model.sample.MetricSample;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.apache.dubbo.metrics.model.MetricsCategory.QPS;
import static org.apache.dubbo.metrics.model.MetricsCategory.REQUESTS;
import static org.apache.dubbo.metrics.model.MetricsCategory.RT;
Expand Down Expand Up @@ -158,7 +159,6 @@ private void collectRequests(List<MetricSample> list) {
timeoutRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_REQUESTS_TIMEOUT_AGG, k.getTags(), REQUESTS, v::get)));
limitRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_REQUESTS_LIMIT_AGG, k.getTags(), REQUESTS, v::get)));
totalFailedRequests.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_REQUESTS_TOTAL_FAILED_AGG, k.getTags(), REQUESTS, v::get)));

}

private void collectQPS(List<MetricSample> list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,215 +17,110 @@

package org.apache.dubbo.metrics.collector;

import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.metrics.collector.stat.MetricsStatComposite;
import org.apache.dubbo.metrics.collector.stat.MetricsStatHandler;
import org.apache.dubbo.metrics.event.EmptyEvent;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.metrics.collector.sample.MethodMetricsSampler;
import org.apache.dubbo.metrics.collector.sample.MetricsCountSampleConfigurer;
import org.apache.dubbo.metrics.collector.sample.MetricsSampler;
import org.apache.dubbo.metrics.collector.sample.SimpleMetricsCountSampler;
import org.apache.dubbo.metrics.collector.sample.ThreadPoolMetricsSampler;
import org.apache.dubbo.metrics.event.MetricsEvent;
import org.apache.dubbo.metrics.event.SimpleMetricsEventMulticaster;
import org.apache.dubbo.metrics.listener.MetricsListener;
import org.apache.dubbo.metrics.model.MetricsKey;
import org.apache.dubbo.metrics.model.ThreadPoolMetric;
import org.apache.dubbo.metrics.model.ApplicationMetric;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.model.sample.MetricSample;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.ArrayList;
import java.util.List;
import java.util.HashSet;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import static org.apache.dubbo.metrics.model.MetricsCategory.THREAD_POOL;
import static org.apache.dubbo.metrics.model.MetricsCategory.APPLICATION;
import static org.apache.dubbo.metrics.model.MetricsCategory.REQUESTS;
import static org.apache.dubbo.metrics.model.MetricsCategory.RT;

import static org.apache.dubbo.metrics.model.MetricsKey.APPLICATION_METRIC_INFO;

/**
* Default implementation of {@link MetricsCollector}
*/
public class DefaultMetricsCollector implements MetricsCollector {

private AtomicBoolean collectEnabled = new AtomicBoolean(false);
private final Set<ThreadPoolMetric> threadPoolMetricSet = new HashSet<ThreadPoolMetric>();
private final MetricsStatComposite stats;
private final SimpleMetricsEventMulticaster eventMulticaster;
private MethodMetricsSampler methodSampler = new MethodMetricsSampler(this);
private ThreadPoolMetricsSampler threadPoolSampler = new ThreadPoolMetricsSampler(this);
private String applicationName;
private ApplicationModel applicationModel;
private List<MetricsSampler> samplers = new ArrayList<>();

public DefaultMetricsCollector() {
this.stats = new MetricsStatComposite( this);
this.eventMulticaster = SimpleMetricsEventMulticaster.getInstance();
samplers.add(methodSampler);
samplers.add(applicationSampler);
samplers.add(threadPoolSampler);
}

public void setCollectEnabled(Boolean collectEnabled) {
this.collectEnabled.compareAndSet(isCollectEnabled(), collectEnabled);
}

public Boolean isCollectEnabled() {
return collectEnabled.get();
}

public void increaseTotalRequests(String applicationName, Invocation invocation) {
increaseAndPublishEvent(applicationName, MetricsEvent.Type.TOTAL, invocation);
}

public void increaseSucceedRequests(String applicationName, Invocation invocation) {
increaseAndPublishEvent(applicationName, MetricsEvent.Type.SUCCEED, invocation);
}

public void increaseUnknownFailedRequests(String applicationName, Invocation invocation) {
increaseAndPublishEvent(applicationName, MetricsEvent.Type.UNKNOWN_FAILED, invocation);
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}

public void businessFailedRequests(String applicationName, Invocation invocation) {
increaseAndPublishEvent(applicationName, MetricsEvent.Type.BUSINESS_FAILED, invocation);
public String getApplicationName() {
return this.applicationName;
}

public void timeoutRequests(String applicationName, Invocation invocation) {
increaseAndPublishEvent(applicationName,MetricsEvent.Type.REQUEST_TIMEOUT, invocation);
public ApplicationModel getApplicationModel(){
return this.applicationModel;
}

public void limitRequests(String applicationName, Invocation invocation) {
increaseAndPublishEvent(applicationName,MetricsEvent.Type.REQUEST_LIMIT, invocation);
public SimpleMetricsEventMulticaster getEventMulticaster(){
return this.eventMulticaster;
}

public void increaseProcessingRequests(String applicationName, Invocation invocation) {
increaseAndPublishEvent(applicationName,MetricsEvent.Type.PROCESSING, invocation);
}

public void decreaseProcessingRequests(String applicationName, Invocation invocation) {
decreaseAndPublishEvent(applicationName,MetricsEvent.Type.PROCESSING, invocation);
}

public void totalFailedRequests(String applicationName, Invocation invocation) {
increaseAndPublishEvent(applicationName,MetricsEvent.Type.TOTAL_FAILED, invocation);
}

private void increaseAndPublishEvent(String applicationName, MetricsEvent.Type total, Invocation invocation) {
this.eventMulticaster.publishEvent(doExecute(total, statHandler -> statHandler.increase(applicationName,invocation)));
public void setCollectEnabled(Boolean collectEnabled) {
this.collectEnabled.compareAndSet(isCollectEnabled(), collectEnabled);
}

private void decreaseAndPublishEvent(String applicationName, MetricsEvent.Type type, Invocation invocation) {
this.eventMulticaster.publishEvent(doExecute(type, statHandler -> statHandler.decrease(applicationName,invocation)));
public Boolean isCollectEnabled() {
return collectEnabled.get();
}

public void addRT(String applicationName,Invocation invocation, Long responseTime) {
this.eventMulticaster.publishEvent(stats.addRtAndRetrieveEvent(applicationName,invocation, responseTime));
}
public void addApplicationInfo(String applicationName, String version) {
doExecute(MetricsEvent.Type.APPLICATION_INFO, statHandler -> statHandler.addApplication(applicationName,version));
public MethodMetricsSampler getMethodSampler(){
return this.methodSampler;
}

public void addThreadPool(FrameworkModel frameworkModel, String applicationName) {
FrameworkExecutorRepository frameworkExecutorRepository =
frameworkModel.getBeanFactory().getBean(FrameworkExecutorRepository.class);
addThreadPoolExecutor(applicationName, "SharedExecutor", frameworkExecutorRepository.getSharedExecutor());
addThreadPoolExecutor(applicationName, "MappingRefreshingExecutor", frameworkExecutorRepository.getMappingRefreshingExecutor());
addThreadPoolExecutor(applicationName, "PoolRouterExecutor", frameworkExecutorRepository.getPoolRouterExecutor());
}

private void addThreadPoolExecutor(String applicationName, String threadPoolName, ExecutorService executorService) {
Optional<ExecutorService> executorOptional = Optional.ofNullable(executorService);
if (executorOptional.isPresent() && executorOptional.get() instanceof ThreadPoolExecutor ) {
threadPoolMetricSet.add(new ThreadPoolMetric(applicationName, threadPoolName,
(ThreadPoolExecutor) executorOptional.get()));
}
public void collectApplication(ApplicationModel applicationModel) {
this.setApplicationName(applicationModel.getApplicationName());
this.applicationModel = applicationModel;
applicationSampler.inc(applicationName,MetricsEvent.Type.APPLICATION_INFO);
}

@Override
public List<MetricSample> collect() {
List<MetricSample> list = new ArrayList<>();
collectApplication(list);
collectRequests(list);
collectRT(list);
collectThreadPool(list);
for (MetricsSampler sampler : samplers) {
List<MetricSample> sample = sampler.sample();
list.addAll(sample);
}
return list;
}

private void collectThreadPool(List<MetricSample> list) {
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_CORE_SIZE, e.getTags(), THREAD_POOL, e::getCorePoolSize)));
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_LARGEST_SIZE, e.getTags(), THREAD_POOL, e::getLargestPoolSize)));
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_MAX_SIZE, e.getTags(), THREAD_POOL, e::getMaximumPoolSize)));
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_ACTIVE_SIZE, e.getTags(), THREAD_POOL, e::getActiveCount)));
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_THREAD_COUNT, e.getTags(), THREAD_POOL, e::getPoolSize)));
threadPoolMetricSet.forEach(e -> list.add(new GaugeMetricSample(MetricsKey.THREAD_POOL_QUEUE_SIZE, e.getTags(), THREAD_POOL, e::getQueueSize)));
}

private void collectApplication(List<MetricSample> list) {
doCollect(MetricsEvent.Type.APPLICATION_INFO, MetricsStatHandler::get).filter(e -> !e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.APPLICATION_METRIC_INFO, k.getTags(),
APPLICATION, v::get))));
}

private void collectRequests(List<MetricSample> list) {
doCollect(MetricsEvent.Type.TOTAL, MetricsStatHandler::get).filter(e -> !e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_REQUESTS, k.getTags(), REQUESTS, v::get))));

doCollect(MetricsEvent.Type.SUCCEED, MetricsStatHandler::get).filter(e -> !e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_REQUESTS_SUCCEED, k.getTags(), REQUESTS, v::get))));

doCollect(MetricsEvent.Type.UNKNOWN_FAILED, MetricsStatHandler::get).filter(e -> !e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_REQUESTS_FAILED, k.getTags(), REQUESTS, v::get))));

doCollect(MetricsEvent.Type.PROCESSING, MetricsStatHandler::get).filter(e -> !e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_REQUESTS_PROCESSING, k.getTags(), REQUESTS, v::get))));

doCollect(MetricsEvent.Type.BUSINESS_FAILED, MetricsStatHandler::get).filter(e -> !e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_REQUEST_BUSINESS_FAILED, k.getTags(), REQUESTS, v::get))));

doCollect(MetricsEvent.Type.REQUEST_TIMEOUT, MetricsStatHandler::get).filter(e -> !e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_REQUESTS_TIMEOUT, k.getTags(), REQUESTS, v::get))));

doCollect(MetricsEvent.Type.REQUEST_LIMIT, MetricsStatHandler::get).filter(e -> !e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_REQUESTS_LIMIT, k.getTags(), REQUESTS, v::get))));

doCollect(MetricsEvent.Type.TOTAL_FAILED, MetricsStatHandler::get).filter(e -> !e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_REQUESTS_TOTAL_FAILED, k.getTags(), REQUESTS, v::get))));

}

private void collectRT(List<MetricSample> list) {
this.stats.getLastRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_RT_LAST, k.getTags(), RT, v::get)));
this.stats.getMinRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_RT_MIN, k.getTags(), RT, v::get)));
this.stats.getMaxRT().forEach((k, v) -> list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_RT_MAX, k.getTags(), RT, v::get)));

this.stats.getTotalRT().forEach((k, v) -> {
list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_RT_SUM, k.getTags(), RT, v::get));

AtomicLong avg = this.stats.getAvgRT().get(k);
AtomicLong count = this.stats.getRtCount().get(k);
avg.set(v.get() / count.get());
list.add(new GaugeMetricSample(MetricsKey.PROVIDER_METRIC_RT_AVG, k.getTags(), RT, avg::get));
});
public void addListener(MetricsListener listener) {
this.eventMulticaster.addListener(listener);
}

private <
T> Optional<T> doCollect(MetricsEvent.Type metricsEventType, Function<MetricsStatHandler, T> statExecutor) {
if (isCollectEnabled()) {
MetricsStatHandler handler = stats.getHandler(metricsEventType);
T result = statExecutor.apply(handler);
return Optional.ofNullable(result);
public SimpleMetricsCountSampler<String,MetricsEvent.Type, ApplicationMetric> applicationSampler = new SimpleMetricsCountSampler<String,MetricsEvent.Type,ApplicationMetric>(){
@Override
public List<MetricSample> sample() {
List<MetricSample> samples = new ArrayList<>();
this.getCount(MetricsEvent.Type.APPLICATION_INFO).filter(e->!e.isEmpty())
.ifPresent(map -> map.forEach((k, v) -> samples.add(new GaugeMetricSample(APPLICATION_METRIC_INFO, k.getTags(),
APPLICATION, v::get))));
return samples;
}
return Optional.empty();
}

private MetricsEvent doExecute(MetricsEvent.Type metricsEventType,
Function<MetricsStatHandler, MetricsEvent> statExecutor) {
if (isCollectEnabled()) {
MetricsStatHandler handler = stats.getHandler(metricsEventType);
return statExecutor.apply(handler);
@Override
protected void countConfigure(
MetricsCountSampleConfigurer<String, MetricsEvent.Type, ApplicationMetric> sampleConfigure) {
sampleConfigure.configureMetrics(configure -> new ApplicationMetric(sampleConfigure.getSource(),
Version.getVersion()));
}
return EmptyEvent.instance();
}

public void addListener(MetricsListener listener) {
this.eventMulticaster.addListener(listener);
}
};
}
Loading