Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services, xds, orca: LRS named metrics support #10282

Merged
merged 9 commits into from
Jul 11, 2023
30 changes: 29 additions & 1 deletion services/src/main/java/io/grpc/services/CallMetricRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public final class CallMetricRecorder {
new AtomicReference<>();
private final AtomicReference<ConcurrentHashMap<String, Double>> requestCostMetrics =
new AtomicReference<>();
private final AtomicReference<ConcurrentHashMap<String, Double>> namedMetrics =
new AtomicReference<>();
private double cpuUtilizationMetric = 0;
private double applicationUtilizationMetric = 0;
private double memoryUtilizationMetric = 0;
Expand Down Expand Up @@ -127,6 +129,27 @@ public CallMetricRecorder recordRequestCostMetric(String name, double value) {
return this;
}

/**
* Records an application-specific opaque custom metric measurement. If RPC has already finished,
* this method is no-op.
*
* <p>A latter record will overwrite its former name-sakes.
*
* @return this recorder object
*/
public CallMetricRecorder recordNamedMetric(String name, double value) {
if (disabled) {
return this;
}
if (namedMetrics.get() == null) {
// The chance of race of creation of the map should be very small, so it should be fine
// to create these maps that might be discarded.
namedMetrics.compareAndSet(null, new ConcurrentHashMap<String, Double>());
}
namedMetrics.get().put(name, value);
return this;
}

/**
* Records a call metric measurement for CPU utilization in the range [0, inf). Values outside the
* valid range are ignored. If RPC has already finished, this method is no-op.
Expand Down Expand Up @@ -235,12 +258,17 @@ Map<String, Double> finalizeAndDump() {
MetricReport finalizeAndDump2() {
Map<String, Double> savedRequestCostMetrics = finalizeAndDump();
Map<String, Double> savedUtilizationMetrics = utilizationMetrics.get();
Map<String, Double> savedNamedMetrics = namedMetrics.get();
if (savedUtilizationMetrics == null) {
savedUtilizationMetrics = Collections.emptyMap();
}
if (savedNamedMetrics == null) {
savedNamedMetrics = Collections.emptyMap();
}
return new MetricReport(cpuUtilizationMetric, applicationUtilizationMetric,
memoryUtilizationMetric, qps, eps, Collections.unmodifiableMap(savedRequestCostMetrics),
Collections.unmodifiableMap(savedUtilizationMetrics)
Collections.unmodifiableMap(savedUtilizationMetrics),
Collections.unmodifiableMap(savedNamedMetrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ public static MetricReport finalizeAndDump2(CallMetricRecorder recorder) {

public static MetricReport createMetricReport(double cpuUtilization,
double applicationUtilization, double memoryUtilization, double qps, double eps,
Map<String, Double> requestCostMetrics, Map<String, Double> utilizationMetrics) {
Map<String, Double> requestCostMetrics, Map<String, Double> utilizationMetrics,
Map<String, Double> namedMetrics) {
return new MetricReport(cpuUtilization, applicationUtilization, memoryUtilization, qps, eps,
requestCostMetrics, utilizationMetrics);
requestCostMetrics, utilizationMetrics, namedMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,6 @@ public void clearEpsMetric() {

MetricReport getMetricReport() {
return new MetricReport(cpuUtilization, applicationUtilization, memoryUtilization, qps, eps,
Collections.emptyMap(), Collections.unmodifiableMap(metricsData));
Collections.emptyMap(), Collections.unmodifiableMap(metricsData), Collections.emptyMap());
}
}
9 changes: 8 additions & 1 deletion services/src/main/java/io/grpc/services/MetricReport.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,19 @@ public final class MetricReport {
private double eps;
private Map<String, Double> requestCostMetrics;
private Map<String, Double> utilizationMetrics;
private Map<String, Double> namedMetrics;

MetricReport(double cpuUtilization, double applicationUtilization, double memoryUtilization,
double qps, double eps, Map<String, Double> requestCostMetrics,
Map<String, Double> utilizationMetrics) {
Map<String, Double> utilizationMetrics, Map<String, Double> namedMetrics) {
this.cpuUtilization = cpuUtilization;
this.applicationUtilization = applicationUtilization;
this.memoryUtilization = memoryUtilization;
this.qps = qps;
this.eps = eps;
this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics");
this.utilizationMetrics = checkNotNull(utilizationMetrics, "utilizationMetrics");
this.namedMetrics = checkNotNull(namedMetrics, "namedMetrics");
}

public double getCpuUtilization() {
Expand All @@ -68,6 +70,10 @@ public Map<String, Double> getUtilizationMetrics() {
return utilizationMetrics;
}

public Map<String, Double> getNamedMetrics() {
return namedMetrics;
}

public double getQps() {
return qps;
}
Expand All @@ -84,6 +90,7 @@ public String toString() {
.add("memoryUtilization", memoryUtilization)
.add("requestCost", requestCostMetrics)
.add("utilization", utilizationMetrics)
.add("named", namedMetrics)
.add("qps", qps)
.add("eps", eps)
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public void dumpDumpsAllSavedMetricValues() {
recorder.recordRequestCostMetric("cost1", 37465.12);
recorder.recordRequestCostMetric("cost2", 10293.0);
recorder.recordRequestCostMetric("cost3", 1.0);
recorder.recordNamedMetric("named1", 0.2233);
recorder.recordNamedMetric("named2", -1.618);
recorder.recordNamedMetric("named3", 3.1415926535);
recorder.recordCpuUtilizationMetric(0.1928);
recorder.recordApplicationUtilizationMetric(0.9987);
recorder.recordMemoryUtilizationMetric(0.474);
Expand All @@ -55,13 +58,18 @@ public void dumpDumpsAllSavedMetricValues() {
.containsExactly("util1", 0.154353423, "util2", 0.1367, "util3", 0.143734);
Truth.assertThat(dump.getRequestCostMetrics())
.containsExactly("cost1", 37465.12, "cost2", 10293.0, "cost3", 1.0);
Truth.assertThat(dump.getNamedMetrics())
.containsExactly("named1", 0.2233, "named2", -1.618, "named3", 3.1415926535);
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0.1928);
Truth.assertThat(dump.getApplicationUtilization()).isEqualTo(0.9987);
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.474);
Truth.assertThat(dump.getQps()).isEqualTo(2522.54);
Truth.assertThat(dump.getEps()).isEqualTo(1.618);
Truth.assertThat(dump.toString()).contains("eps=1.618");
Truth.assertThat(dump.toString()).contains("applicationUtilization=0.9987");
Truth.assertThat(dump.toString()).contains("named1=0.2233");
Truth.assertThat(dump.toString()).contains("named2=-1.618");
Truth.assertThat(dump.toString()).contains("named3=3.1415926535");
}

@Test
Expand All @@ -71,6 +79,7 @@ public void noMetricsRecordedAfterSnapshot() {
recorder.recordUtilizationMetric("cost", 0.154353423);
recorder.recordQpsMetric(3.14159);
recorder.recordEpsMetric(1.618);
recorder.recordNamedMetric("named1", 2.718);
assertThat(recorder.finalizeAndDump()).isEqualTo(initDump);
}

Expand Down Expand Up @@ -121,6 +130,9 @@ public void lastValueWinForMetricsWithSameName() {
recorder.recordUtilizationMetric("util1", 0.2837421);
recorder.recordMemoryUtilizationMetric(0.93840);
recorder.recordUtilizationMetric("util1", 0.843233);
recorder.recordNamedMetric("named1", 0.2233);
recorder.recordNamedMetric("named2", 2.718);
recorder.recordNamedMetric("named1", 3.1415926535);
recorder.recordQpsMetric(1928.3);
recorder.recordQpsMetric(100.8);
recorder.recordEpsMetric(3.14159);
Expand All @@ -133,6 +145,8 @@ public void lastValueWinForMetricsWithSameName() {
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.93840);
Truth.assertThat(dump.getUtilizationMetrics())
.containsExactly("util1", 0.843233);
Truth.assertThat(dump.getNamedMetrics())
.containsExactly("named1", 3.1415926535, "named2", 2.718);
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0);
Truth.assertThat(dump.getQps()).isEqualTo(100.8);
Truth.assertThat(dump.getEps()).isEqualTo(1.618);
Expand Down
25 changes: 24 additions & 1 deletion xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.grpc.Status;
import io.grpc.internal.ForwardingClientStreamTracer;
import io.grpc.internal.ObjectPool;
import io.grpc.services.MetricReport;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.ForwardingSubchannel;
import io.grpc.util.GracefulSwitchLoadBalancer;
Expand All @@ -47,6 +48,8 @@
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import io.grpc.xds.internal.security.SslContextProviderSupplier;
import io.grpc.xds.orca.OrcaPerRequestUtil;
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -329,7 +332,9 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (stats != null) {
ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
stats, inFlights, result.getStreamTracerFactory());
return PickResult.withSubchannel(result.getSubchannel(), tracerFactory);
ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
.newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory);
}
}
return result;
Expand Down Expand Up @@ -386,4 +391,22 @@ public void streamClosed(Status status) {
};
}
}

private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener {

private final ClusterLocalityStats stats;

private OrcaPerRpcListener(ClusterLocalityStats stats) {
this.stats = checkNotNull(stats, "stats");
}

/**
* Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is
* included in the snapshot for the LRS report sent to the LRS server.
*/
@Override
public void onLoadReport(MetricReport report) {
stats.recordBackendLoadMetricStats(report.getNamedMetrics());
}
}
}
12 changes: 11 additions & 1 deletion xds/src/main/java/io/grpc/xds/LoadReportClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -361,7 +362,16 @@ private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats(
.setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests())
.setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests())
.setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress())
.setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests()));
.setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests())
.addAllLoadMetricStats(
upstreamLocalityStats.loadMetricStatsMap().entrySet().stream().map(
e -> io.envoyproxy.envoy.config.endpoint.v3.EndpointLoadMetricStats.newBuilder()
.setMetricName(e.getKey())
.setNumRequestsFinishedWithMetric(
e.getValue().numRequestsFinishedWithMetric())
.setTotalMetricValue(e.getValue().totalMetricValue())
.build())
.collect(Collectors.toList())));
}
for (DroppedRequests droppedRequests : stats.droppedRequestsList()) {
builder.addDroppedRequests(
Expand Down
33 changes: 30 additions & 3 deletions xds/src/main/java/io/grpc/xds/LoadStatsManager2.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Supplier;
import com.google.common.collect.Sets;
import io.grpc.Status;
import io.grpc.xds.Stats.BackendLoadMetricStats;
import io.grpc.xds.Stats.ClusterStats;
import io.grpc.xds.Stats.DroppedRequests;
import io.grpc.xds.Stats.UpstreamLocalityStats;
Expand Down Expand Up @@ -197,7 +198,7 @@ synchronized List<ClusterStats> getClusterStatsReports(String cluster) {
}
UpstreamLocalityStats upstreamLocalityStats = UpstreamLocalityStats.create(
locality, snapshot.callsIssued, snapshot.callsSucceeded, snapshot.callsFailed,
snapshot.callsInProgress);
snapshot.callsInProgress, snapshot.loadMetricStatsMap);
builder.addUpstreamLocalityStats(upstreamLocalityStats);
// Use the max (drops/loads) recording interval as the overall interval for the
// cluster's stats. In general, they should be mostly identical.
Expand Down Expand Up @@ -322,6 +323,7 @@ final class ClusterLocalityStats {
private final AtomicLong callsSucceeded = new AtomicLong();
private final AtomicLong callsFailed = new AtomicLong();
private final AtomicLong callsIssued = new AtomicLong();
private Map<String, BackendLoadMetricStats> loadMetricStatsMap = new HashMap<>();

private ClusterLocalityStats(
String clusterName, @Nullable String edsServiceName, Locality locality,
Expand Down Expand Up @@ -353,6 +355,23 @@ void recordCallFinished(Status status) {
}
}

/**
* Records all custom named backend load metric stats for per-call load reporting. For each
* metric key {@code name}, creates a new {@link BackendLoadMetricStats} with a finished
* requests counter of 1 and the {@code value} if the key is not present in the map. Otherwise,
* increments the finished requests counter and adds the {@code value} to the existing
* {@link BackendLoadMetricStats}.
*/
synchronized void recordBackendLoadMetricStats(Map<String, Double> namedMetrics) {
namedMetrics.forEach((name, value) -> {
if (!loadMetricStatsMap.containsKey(name)) {
loadMetricStatsMap.put(name, new BackendLoadMetricStats(1, value));
} else {
loadMetricStatsMap.get(name).addMetricValueAndIncrementRequestsFinished(value);
}
});
}

/**
* Release the <i>hard</i> reference for this stats object (previously obtained via {@link
* LoadStatsManager2#getClusterLocalityStats}). The object may still be
Expand All @@ -367,8 +386,13 @@ void release() {
private ClusterLocalityStatsSnapshot snapshot() {
long duration = stopwatch.elapsed(TimeUnit.NANOSECONDS);
stopwatch.reset().start();
Map<String, BackendLoadMetricStats> loadMetricStatsMapCopy;
synchronized (this) {
loadMetricStatsMapCopy = Collections.unmodifiableMap(loadMetricStatsMap);
loadMetricStatsMap = new HashMap<>();
}
return new ClusterLocalityStatsSnapshot(callsSucceeded.getAndSet(0), callsInProgress.get(),
callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration);
callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration, loadMetricStatsMapCopy);
}
}

Expand All @@ -378,15 +402,18 @@ private static final class ClusterLocalityStatsSnapshot {
private final long callsFailed;
private final long callsIssued;
private final long durationNano;
private final Map<String, BackendLoadMetricStats> loadMetricStatsMap;

private ClusterLocalityStatsSnapshot(
long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued,
long durationNano) {
long durationNano, Map<String, BackendLoadMetricStats> loadMetricStatsMap) {
this.callsSucceeded = callsSucceeded;
this.callsInProgress = callsInProgress;
this.callsFailed = callsFailed;
this.callsIssued = callsIssued;
this.durationNano = durationNano;
this.loadMetricStatsMap = Collections.unmodifiableMap(
checkNotNull(loadMetricStatsMap, "loadMetricStatsMap"));
}
}
}
41 changes: 39 additions & 2 deletions xds/src/main/java/io/grpc/xds/Stats.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import javax.annotation.Nullable;

/** Represents client load stats. */
Expand Down Expand Up @@ -101,10 +103,45 @@ abstract static class UpstreamLocalityStats {

abstract long totalRequestsInProgress();

abstract ImmutableMap<String, BackendLoadMetricStats> loadMetricStatsMap();

static UpstreamLocalityStats create(Locality locality, long totalIssuedRequests,
long totalSuccessfulRequests, long totalErrorRequests, long totalRequestsInProgress) {
long totalSuccessfulRequests, long totalErrorRequests, long totalRequestsInProgress,
Map<String, BackendLoadMetricStats> loadMetricStatsMap) {
return new AutoValue_Stats_UpstreamLocalityStats(locality, totalIssuedRequests,
totalSuccessfulRequests, totalErrorRequests, totalRequestsInProgress);
totalSuccessfulRequests, totalErrorRequests, totalRequestsInProgress,
ImmutableMap.copyOf(loadMetricStatsMap));
}
}

/**
* Load metric stats for multi-dimensional load balancing.
*/
static final class BackendLoadMetricStats {

private long numRequestsFinishedWithMetric;
private double totalMetricValue;

BackendLoadMetricStats(long numRequestsFinishedWithMetric, double totalMetricValue) {
this.numRequestsFinishedWithMetric = numRequestsFinishedWithMetric;
this.totalMetricValue = totalMetricValue;
}

public long numRequestsFinishedWithMetric() {
return numRequestsFinishedWithMetric;
}

public double totalMetricValue() {
return totalMetricValue;
}

/**
* Adds the given {@code metricValue} and increments the number of requests finished counter for
* the existing {@link BackendLoadMetricStats}.
*/
public void addMetricValueAndIncrementRequestsFinished(double metricValue) {
numRequestsFinishedWithMetric += 1;
totalMetricValue += metricValue;
}
}
}
Loading