From 8ff32e1b370a6d5edd02d8a0999ad043f38f7cde Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Fri, 26 May 2023 22:40:22 +0000 Subject: [PATCH 1/9] add namedMetrics map & API to CallMetricRecorder --- .../io/grpc/services/CallMetricRecorder.java | 30 ++++++- .../services/InternalCallMetricRecorder.java | 5 +- .../java/io/grpc/services/MetricRecorder.java | 2 +- .../java/io/grpc/services/MetricReport.java | 9 +- .../grpc/services/CallMetricRecorderTest.java | 13 +++ .../io/grpc/xds/orca/OrcaPerRequestUtil.java | 2 +- .../WeightedRoundRobinLoadBalancerTest.java | 84 +++++++++---------- .../grpc/xds/orca/OrcaPerRequestUtilTest.java | 3 +- 8 files changed, 99 insertions(+), 49 deletions(-) diff --git a/services/src/main/java/io/grpc/services/CallMetricRecorder.java b/services/src/main/java/io/grpc/services/CallMetricRecorder.java index f491fb7542e..d480f0f4c3b 100644 --- a/services/src/main/java/io/grpc/services/CallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/CallMetricRecorder.java @@ -41,6 +41,8 @@ public final class CallMetricRecorder { new AtomicReference<>(); private final AtomicReference> requestCostMetrics = new AtomicReference<>(); + private final AtomicReference> namedMetrics = + new AtomicReference<>(); private double cpuUtilizationMetric = 0; private double applicationUtilizationMetric = 0; private double memoryUtilizationMetric = 0; @@ -127,6 +129,27 @@ public CallMetricRecorder recordRequestCostMetric(String name, double value) { return this; } + /** + * Records an application-specific opaque custom metric measurement. If RPC has already finished, + * this method is no-op. + * + *

A latter record will overwrite its former name-sakes. + * + * @return this recorder object + */ + public CallMetricRecorder recordNamedMetric(String name, double value) { + if (disabled) { + return this; + } + if (namedMetrics.get() == null) { + // The chance of race of creation of the map should be very small, so it should be fine + // to create these maps that might be discarded. + namedMetrics.compareAndSet(null, new ConcurrentHashMap()); + } + namedMetrics.get().put(name, value); + return this; + } + /** * Records a call metric measurement for CPU utilization in the range [0, inf). Values outside the * valid range are ignored. If RPC has already finished, this method is no-op. @@ -235,12 +258,17 @@ Map finalizeAndDump() { MetricReport finalizeAndDump2() { Map savedRequestCostMetrics = finalizeAndDump(); Map savedUtilizationMetrics = utilizationMetrics.get(); + Map savedNamedMetrics = namedMetrics.get(); if (savedUtilizationMetrics == null) { savedUtilizationMetrics = Collections.emptyMap(); } + if (savedNamedMetrics == null) { + savedNamedMetrics = Collections.emptyMap(); + } return new MetricReport(cpuUtilizationMetric, applicationUtilizationMetric, memoryUtilizationMetric, qps, eps, Collections.unmodifiableMap(savedRequestCostMetrics), - Collections.unmodifiableMap(savedUtilizationMetrics) + Collections.unmodifiableMap(savedUtilizationMetrics), + Collections.unmodifiableMap(savedNamedMetrics) ); } diff --git a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java index 3b0cbbbda35..a7ff1e5c32e 100644 --- a/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/InternalCallMetricRecorder.java @@ -47,8 +47,9 @@ public static MetricReport finalizeAndDump2(CallMetricRecorder recorder) { public static MetricReport createMetricReport(double cpuUtilization, double applicationUtilization, double memoryUtilization, double qps, double eps, - Map requestCostMetrics, Map utilizationMetrics) { + Map requestCostMetrics, Map utilizationMetrics, + Map namedMetrics) { return new MetricReport(cpuUtilization, applicationUtilization, memoryUtilization, qps, eps, - requestCostMetrics, utilizationMetrics); + requestCostMetrics, utilizationMetrics, namedMetrics); } } diff --git a/services/src/main/java/io/grpc/services/MetricRecorder.java b/services/src/main/java/io/grpc/services/MetricRecorder.java index c585e7b5407..39b0e8df25f 100644 --- a/services/src/main/java/io/grpc/services/MetricRecorder.java +++ b/services/src/main/java/io/grpc/services/MetricRecorder.java @@ -155,6 +155,6 @@ public void clearEpsMetric() { MetricReport getMetricReport() { return new MetricReport(cpuUtilization, applicationUtilization, memoryUtilization, qps, eps, - Collections.emptyMap(), Collections.unmodifiableMap(metricsData)); + Collections.emptyMap(), Collections.unmodifiableMap(metricsData), Collections.emptyMap()); } } diff --git a/services/src/main/java/io/grpc/services/MetricReport.java b/services/src/main/java/io/grpc/services/MetricReport.java index 35cbfc056bf..e559f0f00b5 100644 --- a/services/src/main/java/io/grpc/services/MetricReport.java +++ b/services/src/main/java/io/grpc/services/MetricReport.java @@ -35,10 +35,11 @@ public final class MetricReport { private double eps; private Map requestCostMetrics; private Map utilizationMetrics; + private Map namedMetrics; MetricReport(double cpuUtilization, double applicationUtilization, double memoryUtilization, double qps, double eps, Map requestCostMetrics, - Map utilizationMetrics) { + Map utilizationMetrics, Map namedMetrics) { this.cpuUtilization = cpuUtilization; this.applicationUtilization = applicationUtilization; this.memoryUtilization = memoryUtilization; @@ -46,6 +47,7 @@ public final class MetricReport { this.eps = eps; this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics"); this.utilizationMetrics = checkNotNull(utilizationMetrics, "utilizationMetrics"); + this.namedMetrics = checkNotNull(namedMetrics, "namedMetrics"); } public double getCpuUtilization() { @@ -68,6 +70,10 @@ public Map getUtilizationMetrics() { return utilizationMetrics; } + public Map getNamedMetrics() { + return namedMetrics; + } + public double getQps() { return qps; } @@ -84,6 +90,7 @@ public String toString() { .add("memoryUtilization", memoryUtilization) .add("requestCost", requestCostMetrics) .add("utilization", utilizationMetrics) + .add("named", namedMetrics) .add("qps", qps) .add("eps", eps) .toString(); diff --git a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java index cb0bfc5d83f..4ef06047f4d 100644 --- a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java +++ b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java @@ -44,6 +44,9 @@ public void dumpDumpsAllSavedMetricValues() { recorder.recordRequestCostMetric("cost1", 37465.12); recorder.recordRequestCostMetric("cost2", 10293.0); recorder.recordRequestCostMetric("cost3", 1.0); + recorder.recordNamedMetric("named1", 0.2233); + recorder.recordNamedMetric("named2", -1.618); + recorder.recordNamedMetric("named3", 3.1415926535); recorder.recordCpuUtilizationMetric(0.1928); recorder.recordApplicationUtilizationMetric(0.9987); recorder.recordMemoryUtilizationMetric(0.474); @@ -55,6 +58,8 @@ public void dumpDumpsAllSavedMetricValues() { .containsExactly("util1", 0.154353423, "util2", 0.1367, "util3", 0.143734); Truth.assertThat(dump.getRequestCostMetrics()) .containsExactly("cost1", 37465.12, "cost2", 10293.0, "cost3", 1.0); + Truth.assertThat(dump.getNamedMetrics()) + .containsExactly("named1", 0.2233, "named2", -1.618, "named3", 3.1415926535); Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0.1928); Truth.assertThat(dump.getApplicationUtilization()).isEqualTo(0.9987); Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.474); @@ -62,6 +67,9 @@ public void dumpDumpsAllSavedMetricValues() { Truth.assertThat(dump.getEps()).isEqualTo(1.618); Truth.assertThat(dump.toString()).contains("eps=1.618"); Truth.assertThat(dump.toString()).contains("applicationUtilization=0.9987"); + Truth.assertThat(dump.toString()).contains("named1=0.2233"); + Truth.assertThat(dump.toString()).contains("named2=-1.618"); + Truth.assertThat(dump.toString()).contains("named3=3.1415926535"); } @Test @@ -121,6 +129,9 @@ public void lastValueWinForMetricsWithSameName() { recorder.recordUtilizationMetric("util1", 0.2837421); recorder.recordMemoryUtilizationMetric(0.93840); recorder.recordUtilizationMetric("util1", 0.843233); + recorder.recordNamedMetric("named1", 0.2233); + recorder.recordNamedMetric("named2", 2.718); + recorder.recordNamedMetric("named1", 3.1415926535); recorder.recordQpsMetric(1928.3); recorder.recordQpsMetric(100.8); recorder.recordEpsMetric(3.14159); @@ -133,6 +144,8 @@ public void lastValueWinForMetricsWithSameName() { Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.93840); Truth.assertThat(dump.getUtilizationMetrics()) .containsExactly("util1", 0.843233); + Truth.assertThat(dump.getNamedMetrics()) + .containsExactly("named1", 3.1415926535, "named2", 2.718); Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0); Truth.assertThat(dump.getQps()).isEqualTo(100.8); Truth.assertThat(dump.getEps()).isEqualTo(1.618); diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java index 97b98cd4a28..814015ba93e 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaPerRequestUtil.java @@ -256,7 +256,7 @@ static MetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) { return InternalCallMetricRecorder.createMetricReport(loadReport.getCpuUtilization(), loadReport.getApplicationUtilization(), loadReport.getMemUtilization(), loadReport.getRpsFractional(), loadReport.getEps(), loadReport.getRequestCostMap(), - loadReport.getUtilizationMap()); + loadReport.getUtilizationMap(), loadReport.getNamedMetricsMap()); } /** diff --git a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java index daf58a174d9..398cb953733 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRoundRobinLoadBalancerTest.java @@ -214,10 +214,10 @@ public void wrrLifeCycle() { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) .getSubchannel()).isEqualTo(weightedSubchannel1); @@ -260,10 +260,10 @@ public void enableOobLoadReportConfig() { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.9, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.9, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); PickResult pickResult = weightedPicker.pickSubchannel(mockArgs); assertThat(pickResult.getSubchannel()).isEqualTo(weightedSubchannel1); @@ -340,11 +340,11 @@ weightedSubchannel3.new OrcaReportListener(weightedConfig.errorUtilizationPenalt @Test public void pickByWeight_LargeWeight() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 999, 0, new HashMap<>(), new HashMap<>()); + 0.1, 0, 0.1, 999, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.9, 0, 0.1, 2, 0, new HashMap<>(), new HashMap<>()); + 0.9, 0, 0.1, 2, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); + 0.86, 0, 0.1, 100, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); double totalWeight = 999 / 0.1 + 2 / 0.9 + 100 / 0.86; pickByWeight(report1, report2, report3, 999 / 0.1 / totalWeight, 2 / 0.9 / totalWeight, @@ -354,11 +354,11 @@ public void pickByWeight_LargeWeight() { @Test public void pickByWeight_largeWeight_useApplicationUtilization() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.44, 0.1, 0.1, 999, 0, new HashMap<>(), new HashMap<>()); + 0.44, 0.1, 0.1, 999, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.12, 0.9, 0.1, 2, 0, new HashMap<>(), new HashMap<>()); + 0.12, 0.9, 0.1, 2, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.33, 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); + 0.33, 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); double totalWeight = 999 / 0.1 + 2 / 0.9 + 100 / 0.86; pickByWeight(report1, report2, report3, 999 / 0.1 / totalWeight, 2 / 0.9 / totalWeight, @@ -368,11 +368,11 @@ public void pickByWeight_largeWeight_useApplicationUtilization() { @Test public void pickByWeight_largeWeight_withEps_defaultErrorUtilizationPenalty() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 999, 13, new HashMap<>(), new HashMap<>()); + 0.1, 0, 0.1, 999, 13, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.9, 0, 0.1, 2, 1.8, new HashMap<>(), new HashMap<>()); + 0.9, 0, 0.1, 2, 1.8, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0, 0.1, 100, 3, new HashMap<>(), new HashMap<>()); + 0.86, 0, 0.1, 100, 3, new HashMap<>(), new HashMap<>(), new HashMap<>()); double weight1 = 999 / (0.1 + 13 / 999F * weightedConfig.errorUtilizationPenalty); double weight2 = 2 / (0.9 + 1.8 / 2F * weightedConfig.errorUtilizationPenalty); double weight3 = 100 / (0.86 + 3 / 100F * weightedConfig.errorUtilizationPenalty); @@ -385,11 +385,11 @@ public void pickByWeight_largeWeight_withEps_defaultErrorUtilizationPenalty() { @Test public void pickByWeight_normalWeight() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.12, 0, 0.1, 22, 0, new HashMap<>(), new HashMap<>()); + 0.12, 0, 0.1, 22, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.28, 0, 0.1, 40, 0, new HashMap<>(), new HashMap<>()); + 0.28, 0, 0.1, 40, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); + 0.86, 0, 0.1, 100, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); double totalWeight = 22 / 0.12 + 40 / 0.28 + 100 / 0.86; pickByWeight(report1, report2, report3, 22 / 0.12 / totalWeight, 40 / 0.28 / totalWeight, 100 / 0.86 / totalWeight @@ -399,11 +399,11 @@ public void pickByWeight_normalWeight() { @Test public void pickByWeight_normalWeight_useApplicationUtilization() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.72, 0.12, 0.1, 22, 0, new HashMap<>(), new HashMap<>()); + 0.72, 0.12, 0.1, 22, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.98, 0.28, 0.1, 40, 0, new HashMap<>(), new HashMap<>()); + 0.98, 0.28, 0.1, 40, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.99, 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>()); + 0.99, 0.86, 0.1, 100, 0, new HashMap<>(), new HashMap<>(), new HashMap<>()); double totalWeight = 22 / 0.12 + 40 / 0.28 + 100 / 0.86; pickByWeight(report1, report2, report3, 22 / 0.12 / totalWeight, 40 / 0.28 / totalWeight, 100 / 0.86 / totalWeight @@ -413,11 +413,11 @@ public void pickByWeight_normalWeight_useApplicationUtilization() { @Test public void pickByWeight_normalWeight_withEps_defaultErrorUtilizationPenalty() { MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.12, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + 0.12, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.28, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + 0.28, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + 0.86, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>(), new HashMap<>()); double weight1 = 22 / (0.12 + 19.7 / 22F * weightedConfig.errorUtilizationPenalty); double weight2 = 40 / (0.28 + 0.998 / 40F * weightedConfig.errorUtilizationPenalty); double weight3 = 100 / (0.86 + 3.14159 / 100F * weightedConfig.errorUtilizationPenalty); @@ -433,11 +433,11 @@ public void pickByWeight_normalWeight_withEps_customErrorUtilizationPenalty() { .setErrorUtilizationPenalty(1.75F).build(); MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0.12, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + 0.12, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0.28, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + 0.28, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0.86, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + 0.86, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>(), new HashMap<>()); double weight1 = 22 / (0.12 + 19.7 / 22F * weightedConfig.errorUtilizationPenalty); double weight2 = 40 / (0.28 + 0.998 / 40F * weightedConfig.errorUtilizationPenalty); double weight3 = 100 / (0.86 + 3.14159 / 100F * weightedConfig.errorUtilizationPenalty); @@ -453,11 +453,11 @@ public void pickByWeight_avgWeight_zeroCpuUtilization_withEps_customErrorUtiliza .setErrorUtilizationPenalty(1.75F).build(); MetricReport report1 = InternalCallMetricRecorder.createMetricReport( - 0, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>()); + 0, 0, 0.1, 22, 19.7, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report2 = InternalCallMetricRecorder.createMetricReport( - 0, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>()); + 0, 0, 0.1, 40, 0.998, new HashMap<>(), new HashMap<>(), new HashMap<>()); MetricReport report3 = InternalCallMetricRecorder.createMetricReport( - 0, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>()); + 0, 0, 0.1, 100, 3.14159, new HashMap<>(), new HashMap<>(), new HashMap<>()); double avgSubchannelPickRatio = 1.0 / 3; pickByWeight(report1, report2, report3, avgSubchannelPickRatio, avgSubchannelPickRatio, @@ -508,10 +508,10 @@ public void blackoutPeriod() { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(5, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -568,10 +568,10 @@ public void updateWeightTimer() { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(11, TimeUnit.SECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) .getSubchannel()).isEqualTo(weightedSubchannel1); @@ -585,10 +585,10 @@ weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalt assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); //timer fires, new weight updated assertThat(fakeClock.forwardTime(500, TimeUnit.MILLISECONDS)).isEqualTo(1); assertThat(weightedPicker.pickSubchannel(mockArgs) @@ -619,10 +619,10 @@ public void weightExpired() { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -684,7 +684,7 @@ public void rrFallback() { subchannel.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( 0.1, 0, 0.1, qpsByChannel.get(subchannel), 0, - new HashMap<>(), new HashMap<>())); + new HashMap<>(), new HashMap<>(), new HashMap<>())); } assertThat(Math.abs(pickCount.get(weightedSubchannel1) / 1000.0 - 1.0 / 2)) .isAtMost(0.1); @@ -700,7 +700,7 @@ subchannel.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoad subchannel.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( 0.1, 0, 0.1, qpsByChannel.get(subchannel), 0, - new HashMap<>(), new HashMap<>())); + new HashMap<>(), new HashMap<>(), new HashMap<>())); fakeClock.forwardTime(50, TimeUnit.MILLISECONDS); } assertThat(pickCount.size()).isEqualTo(2); @@ -738,10 +738,10 @@ public void unknownWeightIsAvgWeight() { WrrSubchannel weightedSubchannel3 = (WrrSubchannel) weightedPicker.getList().get(2); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); Map pickCount = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -782,10 +782,10 @@ public void pickFromOtherThread() throws Exception { WrrSubchannel weightedSubchannel2 = (WrrSubchannel) weightedPicker.getList().get(1); weightedSubchannel1.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.1, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); weightedSubchannel2.new OrcaReportListener(weightedConfig.errorUtilizationPenalty).onLoadReport( InternalCallMetricRecorder.createMetricReport( - 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>())); + 0.2, 0, 0.1, 1, 0, new HashMap<>(), new HashMap<>(), new HashMap<>())); CyclicBarrier barrier = new CyclicBarrier(2); Map pickCount = new ConcurrentHashMap<>(); pickCount.put(weightedSubchannel1, new AtomicInteger(0)); diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java index ef06e6fccfe..4d080502915 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaPerRequestUtilTest.java @@ -124,7 +124,8 @@ static boolean reportEqual(MetricReport a, && a.getQps() == b.getQps() && a.getEps() == b.getEps() && Objects.equal(a.getRequestCostMetrics(), b.getRequestCostMetrics()) - && Objects.equal(a.getUtilizationMetrics(), b.getUtilizationMetrics()); + && Objects.equal(a.getUtilizationMetrics(), b.getUtilizationMetrics()) + && Objects.equal(a.getNamedMetrics(), b.getNamedMetrics()); } /** From c04f5cd66428991f284163cef6f0094c4d9fc1c1 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 29 May 2023 01:32:31 +0000 Subject: [PATCH 2/9] add named metrics to ORCA server interceptor --- .../orca/OrcaMetricReportingServerInterceptor.java | 6 ++++-- .../OrcaMetricReportingServerInterceptorTest.java | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java b/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java index 34922dfc887..1b767e0303c 100644 --- a/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java +++ b/xds/src/main/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptor.java @@ -120,7 +120,8 @@ private static OrcaLoadReport.Builder fromInternalReport(MetricReport internalRe .setRpsFractional(internalReport.getQps()) .setEps(internalReport.getEps()) .putAllUtilization(internalReport.getUtilizationMetrics()) - .putAllRequestCost(internalReport.getRequestCostMetrics()); + .putAllRequestCost(internalReport.getRequestCostMetrics()) + .putAllNamedMetrics(internalReport.getNamedMetrics()); } /** @@ -133,7 +134,8 @@ private static void mergeMetrics( MetricReport callMetricRecorderReport ) { metricRecorderReportBuilder.putAllUtilization(callMetricRecorderReport.getUtilizationMetrics()) - .putAllRequestCost(callMetricRecorderReport.getRequestCostMetrics()); + .putAllRequestCost(callMetricRecorderReport.getRequestCostMetrics()) + .putAllNamedMetrics(callMetricRecorderReport.getNamedMetrics()); // Overwrite only if the values from the given MetricReport for CallMetricRecorder are set double cpu = callMetricRecorderReport.getCpuUtilization(); if (isReportValueSet(cpu)) { diff --git a/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java b/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java index 00562be3dc5..469cd9363ae 100644 --- a/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java +++ b/xds/src/test/java/io/grpc/xds/orca/OrcaMetricReportingServerInterceptorTest.java @@ -72,6 +72,7 @@ public class OrcaMetricReportingServerInterceptorTest { private final Map applicationUtilizationMetricsMap = new HashMap<>(); private final Map applicationCostMetrics = new HashMap<>(); + private final Map applicationNamedMetrics = new HashMap<>(); private double cpuUtilizationMetrics = 0; private double applicationUtilizationMetrics = 0; private double memoryUtilizationMetrics = 0; @@ -98,6 +99,9 @@ public void unaryRpc( CallMetricRecorder.getCurrent().recordRequestCostMetric(entry.getKey(), entry.getValue()); } + for (Map.Entry entry : applicationNamedMetrics.entrySet()) { + CallMetricRecorder.getCurrent().recordNamedMetric(entry.getKey(), entry.getValue()); + } CallMetricRecorder.getCurrent().recordCpuUtilizationMetric(cpuUtilizationMetrics); CallMetricRecorder.getCurrent() .recordApplicationUtilizationMetric(applicationUtilizationMetrics); @@ -196,6 +200,9 @@ public void responseTrailersContainAllReportedMetricsFromCallMetricRecorder() { applicationUtilizationMetricsMap.put("util1", 0.1082); applicationUtilizationMetricsMap.put("util2", 0.4936); applicationUtilizationMetricsMap.put("util3", 0.5342); + applicationNamedMetrics.put("named1", 0.777); + applicationNamedMetrics.put("named2", 737.747); + applicationNamedMetrics.put("named3", -0.380); cpuUtilizationMetrics = 0.3465; applicationUtilizationMetrics = 0.99887; memoryUtilizationMetrics = 0.764; @@ -209,6 +216,8 @@ public void responseTrailersContainAllReportedMetricsFromCallMetricRecorder() { .containsExactly("util1", 0.1082, "util2", 0.4936, "util3", 0.5342); assertThat(report.getRequestCostMap()) .containsExactly("cost1", 1231.4543, "cost2", 0.1367, "cost3", 7614.145); + assertThat(report.getNamedMetricsMap()) + .containsExactly("named1", 0.777, "named2", 737.747, "named3", -0.380); assertThat(report.getCpuUtilization()).isEqualTo(0.3465); assertThat(report.getApplicationUtilization()).isEqualTo(0.99887); assertThat(report.getMemUtilization()).isEqualTo(0.764); @@ -221,6 +230,9 @@ public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricR applicationUtilizationMetricsMap.put("util1", 0.1482); applicationUtilizationMetricsMap.put("util2", 0.4036); applicationUtilizationMetricsMap.put("util3", 0.5742); + applicationNamedMetrics.put("named1", 0.777); + applicationNamedMetrics.put("named2", 737.747); + applicationNamedMetrics.put("named3", -0.380); cpuUtilizationMetrics = 0.3465; memoryUtilizationMetrics = 0.967; metricRecorder.setApplicationUtilizationMetric(2.718); @@ -240,6 +252,8 @@ public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricR assertThat(report.getUtilizationMap()) .containsExactly("util1", 0.1482, "util2", 0.4036, "util3", 0.5742, "serverUtil1", 0.7467, "serverUtil2", 0.2233); + assertThat(report.getNamedMetricsMap()) + .containsExactly("named1", 0.777, "named2", 737.747, "named3", -0.380); assertThat(report.getRequestCostMap()).isEmpty(); assertThat(report.getCpuUtilization()).isEqualTo(0.3465); assertThat(report.getApplicationUtilization()).isEqualTo(2.718); From 1ee52d6bf6f1c7e2018fb7211b79a8848000aa96 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Wed, 7 Jun 2023 22:24:23 +0000 Subject: [PATCH 3/9] new named metrics struct & ORCA tracer factory in ClusterImpl LB --- .../io/grpc/xds/ClusterImplLoadBalancer.java | 28 ++++++++++++++++++- .../java/io/grpc/xds/LoadReportClient.java | 12 +++++++- .../java/io/grpc/xds/LoadStatsManager2.java | 25 +++++++++++++++-- xds/src/main/java/io/grpc/xds/Stats.java | 27 ++++++++++++++++-- 4 files changed, 85 insertions(+), 7 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 074bb301b58..d61adf02ee4 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -33,6 +33,7 @@ import io.grpc.Status; import io.grpc.internal.ForwardingClientStreamTracer; import io.grpc.internal.ObjectPool; +import io.grpc.services.MetricReport; import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.ForwardingSubchannel; import io.grpc.util.GracefulSwitchLoadBalancer; @@ -47,9 +48,12 @@ import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import io.grpc.xds.internal.security.SslContextProviderSupplier; +import io.grpc.xds.orca.OrcaPerRequestUtil; +import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -329,7 +333,9 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (stats != null) { ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory( stats, inFlights, result.getStreamTracerFactory()); - return PickResult.withSubchannel(result.getSubchannel(), tracerFactory); + ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance() + .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats)); + return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory); } } return result; @@ -386,4 +392,24 @@ public void streamClosed(Status status) { }; } } + + private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener { + + private final ClusterLocalityStats stats; + + private OrcaPerRpcListener(ClusterLocalityStats stats) { + this.stats = checkNotNull(stats, "stats"); + } + + /** + * Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is + * included in the snapshot for the LRS report sent to the LRS server. + */ + @Override + public void onLoadReport(MetricReport report) { + for (Map.Entry entry : report.getNamedMetrics().entrySet()) { + stats.recordBackendLoadMetricStats(entry.getKey(), entry.getValue()); + } + } + } } diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index 9daa440a3dc..b86c8110f63 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -361,7 +362,16 @@ private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats( .setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests()) .setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests()) .setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress()) - .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests())); + .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests()) + .addAllLoadMetricStats( + upstreamLocalityStats.loadMetricStatsMap().entrySet().stream().map( + e -> io.envoyproxy.envoy.config.endpoint.v3.EndpointLoadMetricStats.newBuilder() + .setMetricName(e.getKey()) + .setNumRequestsFinishedWithMetric( + e.getValue().numRequestsFinishedWithMetric()) + .setTotalMetricValue(e.getValue().totalMetricValue()) + .build()) + .collect(Collectors.toList()))); } for (DroppedRequests droppedRequests : stats.droppedRequestsList()) { builder.addDroppedRequests( diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java b/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java index 4bd0ba437be..4dce0f9d51e 100644 --- a/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java @@ -23,6 +23,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Sets; import io.grpc.Status; +import io.grpc.xds.Stats.BackendLoadMetricStats; import io.grpc.xds.Stats.ClusterStats; import io.grpc.xds.Stats.DroppedRequests; import io.grpc.xds.Stats.UpstreamLocalityStats; @@ -37,6 +38,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -197,7 +199,7 @@ synchronized List getClusterStatsReports(String cluster) { } UpstreamLocalityStats upstreamLocalityStats = UpstreamLocalityStats.create( locality, snapshot.callsIssued, snapshot.callsSucceeded, snapshot.callsFailed, - snapshot.callsInProgress); + snapshot.callsInProgress, snapshot.loadMetricStatsMap); builder.addUpstreamLocalityStats(upstreamLocalityStats); // Use the max (drops/loads) recording interval as the overall interval for the // cluster's stats. In general, they should be mostly identical. @@ -322,6 +324,8 @@ final class ClusterLocalityStats { private final AtomicLong callsSucceeded = new AtomicLong(); private final AtomicLong callsFailed = new AtomicLong(); private final AtomicLong callsIssued = new AtomicLong(); + private final AtomicReference> + loadMetricStatsMap = new AtomicReference<>(new ConcurrentHashMap<>()); private ClusterLocalityStats( String clusterName, @Nullable String edsServiceName, Locality locality, @@ -353,6 +357,18 @@ void recordCallFinished(Status status) { } } + /** + * Records a custom named backend load metric stat for per-call load reporting. + */ + void recordBackendLoadMetricStats(String name, double value) { + loadMetricStatsMap.get().merge(name, BackendLoadMetricStats.create(1, value), + (oldBackendLoadMetricStats, newBackendLoadMetricStats) -> BackendLoadMetricStats.create( + oldBackendLoadMetricStats.numRequestsFinishedWithMetric() + + newBackendLoadMetricStats.numRequestsFinishedWithMetric(), + oldBackendLoadMetricStats.totalMetricValue() + + newBackendLoadMetricStats.totalMetricValue())); + } + /** * Release the hard reference for this stats object (previously obtained via {@link * LoadStatsManager2#getClusterLocalityStats}). The object may still be @@ -368,7 +384,8 @@ private ClusterLocalityStatsSnapshot snapshot() { long duration = stopwatch.elapsed(TimeUnit.NANOSECONDS); stopwatch.reset().start(); return new ClusterLocalityStatsSnapshot(callsSucceeded.getAndSet(0), callsInProgress.get(), - callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration); + callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration, + Collections.unmodifiableMap(loadMetricStatsMap.getAndSet(new ConcurrentHashMap<>()))); } } @@ -378,15 +395,17 @@ private static final class ClusterLocalityStatsSnapshot { private final long callsFailed; private final long callsIssued; private final long durationNano; + private final Map loadMetricStatsMap; private ClusterLocalityStatsSnapshot( long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued, - long durationNano) { + long durationNano, Map loadMetricStatsMap) { this.callsSucceeded = callsSucceeded; this.callsInProgress = callsInProgress; this.callsFailed = callsFailed; this.callsIssued = callsIssued; this.durationNano = durationNano; + this.loadMetricStatsMap = checkNotNull(loadMetricStatsMap, "loadMetricStatsMap"); } } } diff --git a/xds/src/main/java/io/grpc/xds/Stats.java b/xds/src/main/java/io/grpc/xds/Stats.java index 7e5fa8639d4..5606b0a4d49 100644 --- a/xds/src/main/java/io/grpc/xds/Stats.java +++ b/xds/src/main/java/io/grpc/xds/Stats.java @@ -18,6 +18,8 @@ import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.Map; import javax.annotation.Nullable; /** Represents client load stats. */ @@ -101,10 +103,31 @@ abstract static class UpstreamLocalityStats { abstract long totalRequestsInProgress(); + abstract ImmutableMap loadMetricStatsMap(); + static UpstreamLocalityStats create(Locality locality, long totalIssuedRequests, - long totalSuccessfulRequests, long totalErrorRequests, long totalRequestsInProgress) { + long totalSuccessfulRequests, long totalErrorRequests, long totalRequestsInProgress, + Map loadMetricStatsMap) { return new AutoValue_Stats_UpstreamLocalityStats(locality, totalIssuedRequests, - totalSuccessfulRequests, totalErrorRequests, totalRequestsInProgress); + totalSuccessfulRequests, totalErrorRequests, totalRequestsInProgress, + ImmutableMap.copyOf(loadMetricStatsMap)); + } + } + + /** + * Load metric stats for multi-dimensional load balancing. + */ + @AutoValue + abstract static class BackendLoadMetricStats { + + abstract long numRequestsFinishedWithMetric(); + + abstract double totalMetricValue(); + + static BackendLoadMetricStats create(long numRequestsFinishedWithMetric, + double totalMetricValue) { + return new AutoValue_Stats_BackendLoadMetricStats(numRequestsFinishedWithMetric, + totalMetricValue); } } } From 363fb5064b944b7c67fec263c29406d32fc6195c Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Wed, 14 Jun 2023 16:17:34 +0000 Subject: [PATCH 4/9] added unit tests --- .../io/grpc/xds/LoadStatsManager2Test.java | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java b/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java index 0cfb7f46a22..a86c330225e 100644 --- a/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java +++ b/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java @@ -39,6 +39,7 @@ */ @RunWith(JUnit4.class) public class LoadStatsManager2Test { + private static final double TOLERANCE = 1.0e-10; private static final String CLUSTER_NAME1 = "cluster-foo.googleapis.com"; private static final String CLUSTER_NAME2 = "cluster-bar.googleapis.com"; private static final String EDS_SERVICE_NAME1 = "backend-service-foo.googleapis.com"; @@ -71,12 +72,18 @@ public void recordAndGetReport() { for (int i = 0; i < 19; i++) { loadCounter1.recordCallStarted(); } + loadCounter1.recordBackendLoadMetricStats("named1", 3.14159); + loadCounter1.recordBackendLoadMetricStats("named1", 1.618); + loadCounter1.recordBackendLoadMetricStats("named1", 99); + loadCounter1.recordBackendLoadMetricStats("named1", -97.23); + loadCounter1.recordBackendLoadMetricStats("named2", -2.718); fakeClock.forwardTime(5L, TimeUnit.SECONDS); dropCounter2.recordDroppedRequest(); loadCounter1.recordCallFinished(Status.OK); for (int i = 0; i < 9; i++) { loadCounter2.recordCallStarted(); } + loadCounter2.recordBackendLoadMetricStats("named3", 0.0009); loadCounter2.recordCallFinished(Status.UNAVAILABLE); fakeClock.forwardTime(10L, TimeUnit.SECONDS); loadCounter3.recordCallStarted(); @@ -96,6 +103,18 @@ public void recordAndGetReport() { assertThat(loadStats1.totalSuccessfulRequests()).isEqualTo(1L); assertThat(loadStats1.totalErrorRequests()).isEqualTo(0L); assertThat(loadStats1.totalRequestsInProgress()).isEqualTo(19L - 1L); + assertThat(loadStats1.loadMetricStatsMap().containsKey("named1")).isTrue(); + assertThat(loadStats1.loadMetricStatsMap().containsKey("named2")).isTrue(); + assertThat( + loadStats1.loadMetricStatsMap().get("named1").numRequestsFinishedWithMetric()).isEqualTo( + 4L); + assertThat(loadStats1.loadMetricStatsMap().get("named1").totalMetricValue()).isWithin(TOLERANCE) + .of(3.14159 + 1.618 + 99 - 97.23); + assertThat( + loadStats1.loadMetricStatsMap().get("named2").numRequestsFinishedWithMetric()).isEqualTo( + 1L); + assertThat(loadStats1.loadMetricStatsMap().get("named2").totalMetricValue()).isWithin(TOLERANCE) + .of(-2.718); UpstreamLocalityStats loadStats2 = findLocalityStats(stats1.upstreamLocalityStatsList(), LOCALITY2); @@ -103,6 +122,12 @@ public void recordAndGetReport() { assertThat(loadStats2.totalSuccessfulRequests()).isEqualTo(0L); assertThat(loadStats2.totalErrorRequests()).isEqualTo(1L); assertThat(loadStats2.totalRequestsInProgress()).isEqualTo(9L - 1L); + assertThat(loadStats2.loadMetricStatsMap().containsKey("named3")).isTrue(); + assertThat( + loadStats2.loadMetricStatsMap().get("named3").numRequestsFinishedWithMetric()).isEqualTo( + 1L); + assertThat(loadStats2.loadMetricStatsMap().get("named3").totalMetricValue()).isWithin(TOLERANCE) + .of(0.0009); ClusterStats stats2 = findClusterStats(allStats, CLUSTER_NAME1, EDS_SERVICE_NAME2); assertThat(stats2.loadReportIntervalNano()).isEqualTo(TimeUnit.SECONDS.toNanos(5L + 10L)); @@ -121,6 +146,7 @@ public void recordAndGetReport() { assertThat(loadStats3.totalSuccessfulRequests()).isEqualTo(0L); assertThat(loadStats3.totalErrorRequests()).isEqualTo(0L); assertThat(loadStats3.totalRequestsInProgress()).isEqualTo(1L); + assertThat(loadStats3.loadMetricStatsMap()).isEmpty(); fakeClock.forwardTime(3L, TimeUnit.SECONDS); List clusterStatsList = loadStatsManager.getClusterStatsReports(CLUSTER_NAME1); @@ -135,11 +161,13 @@ public void recordAndGetReport() { assertThat(loadStats1.totalSuccessfulRequests()).isEqualTo(0L); assertThat(loadStats1.totalErrorRequests()).isEqualTo(0L); assertThat(loadStats1.totalRequestsInProgress()).isEqualTo(18L); // still in-progress + assertThat(loadStats1.loadMetricStatsMap()).isEmpty(); loadStats2 = findLocalityStats(stats1.upstreamLocalityStatsList(), LOCALITY2); assertThat(loadStats2.totalIssuedRequests()).isEqualTo(0L); assertThat(loadStats2.totalSuccessfulRequests()).isEqualTo(0L); assertThat(loadStats2.totalErrorRequests()).isEqualTo(0L); assertThat(loadStats2.totalRequestsInProgress()).isEqualTo(8L); // still in-progress + assertThat(loadStats2.loadMetricStatsMap()).isEmpty(); stats2 = findClusterStats(clusterStatsList, CLUSTER_NAME1, EDS_SERVICE_NAME2); assertThat(stats2.loadReportIntervalNano()).isEqualTo(TimeUnit.SECONDS.toNanos(3L)); @@ -194,9 +222,13 @@ public void sharedLoadCounterStatsAggregation() { ClusterLocalityStats ref2 = loadStatsManager.getClusterLocalityStats( CLUSTER_NAME1, EDS_SERVICE_NAME1, LOCALITY1); ref1.recordCallStarted(); + ref1.recordBackendLoadMetricStats("named1", 1.618); + ref1.recordBackendLoadMetricStats("named1", 3.14159); ref1.recordCallFinished(Status.OK); ref2.recordCallStarted(); ref2.recordCallStarted(); + ref2.recordBackendLoadMetricStats("named1", -1); + ref2.recordBackendLoadMetricStats("named2", 2.718); ref2.recordCallFinished(Status.UNAVAILABLE); ClusterStats stats = Iterables.getOnlyElement( @@ -207,6 +239,18 @@ public void sharedLoadCounterStatsAggregation() { assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(1L); assertThat(localityStats.totalErrorRequests()).isEqualTo(1L); assertThat(localityStats.totalRequestsInProgress()).isEqualTo(1L + 2L - 1L - 1L); + assertThat(localityStats.loadMetricStatsMap().containsKey("named1")).isTrue(); + assertThat(localityStats.loadMetricStatsMap().containsKey("named2")).isTrue(); + assertThat( + localityStats.loadMetricStatsMap().get("named1").numRequestsFinishedWithMetric()).isEqualTo( + 3L); + assertThat(localityStats.loadMetricStatsMap().get("named1").totalMetricValue()).isWithin( + TOLERANCE).of(1.618 + 3.14159 - 1); + assertThat( + localityStats.loadMetricStatsMap().get("named2").numRequestsFinishedWithMetric()).isEqualTo( + 1L); + assertThat(localityStats.loadMetricStatsMap().get("named2").totalMetricValue()).isEqualTo( + 2.718); } @Test @@ -215,6 +259,8 @@ public void loadCounterDelayedDeletionAfterAllInProgressRequestsReported() { CLUSTER_NAME1, EDS_SERVICE_NAME1, LOCALITY1); counter.recordCallStarted(); counter.recordCallStarted(); + counter.recordBackendLoadMetricStats("named1", 2.718); + counter.recordBackendLoadMetricStats("named1", 1.414); ClusterStats stats = Iterables.getOnlyElement( loadStatsManager.getClusterStatsReports(CLUSTER_NAME1)); @@ -224,6 +270,12 @@ public void loadCounterDelayedDeletionAfterAllInProgressRequestsReported() { assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.totalErrorRequests()).isEqualTo(0L); assertThat(localityStats.totalRequestsInProgress()).isEqualTo(2L); + assertThat(localityStats.loadMetricStatsMap().containsKey("named1")).isTrue(); + assertThat( + localityStats.loadMetricStatsMap().get("named1").numRequestsFinishedWithMetric()).isEqualTo( + 2L); + assertThat(localityStats.loadMetricStatsMap().get("named1").totalMetricValue()).isEqualTo( + 2.718 + 1.414); // release the counter, but requests still in-flight counter.release(); @@ -234,6 +286,7 @@ public void loadCounterDelayedDeletionAfterAllInProgressRequestsReported() { assertThat(localityStats.totalErrorRequests()).isEqualTo(0L); assertThat(localityStats.totalRequestsInProgress()) .isEqualTo(2L); // retained by in-flight calls + assertThat(localityStats.loadMetricStatsMap().isEmpty()).isTrue(); counter.recordCallFinished(Status.OK); counter.recordCallFinished(Status.UNAVAILABLE); @@ -243,6 +296,7 @@ public void loadCounterDelayedDeletionAfterAllInProgressRequestsReported() { assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(1L); assertThat(localityStats.totalErrorRequests()).isEqualTo(1L); assertThat(localityStats.totalRequestsInProgress()).isEqualTo(0L); + assertThat(localityStats.loadMetricStatsMap().isEmpty()).isTrue(); assertThat(loadStatsManager.getClusterStatsReports(CLUSTER_NAME1)).isEmpty(); } From f38136101f7c9b265d5d9a130b0e9bd865396f7e Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 15 Jun 2023 01:11:50 +0000 Subject: [PATCH 5/9] added more tests --- .../grpc/services/CallMetricRecorderTest.java | 1 + .../grpc/xds/ClusterImplLoadBalancerTest.java | 38 +++++++++++++++++++ .../io/grpc/xds/LoadReportClientTest.java | 28 ++++++++++++++ 3 files changed, 67 insertions(+) diff --git a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java index 4ef06047f4d..f60446b1a3a 100644 --- a/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java +++ b/services/src/test/java/io/grpc/services/CallMetricRecorderTest.java @@ -79,6 +79,7 @@ public void noMetricsRecordedAfterSnapshot() { recorder.recordUtilizationMetric("cost", 0.154353423); recorder.recordQpsMetric(3.14159); recorder.recordEpsMetric(1.618); + recorder.recordNamedMetric("named1", 2.718); assertThat(recorder.finalizeAndDump()).isEqualTo(initDump); } diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 2ab101b730d..abe1529fe67 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.github.xds.data.orca.v3.OrcaLoadReport; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import io.grpc.Attributes; @@ -45,6 +46,7 @@ import io.grpc.internal.FakeClock; import io.grpc.internal.ObjectPool; import io.grpc.internal.ServiceConfigUtil.PolicySelection; +import io.grpc.protobuf.ProtoUtils; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig; import io.grpc.xds.Endpoints.DropOverload; @@ -93,6 +95,10 @@ public class ClusterImplLoadBalancerTest { private static final String EDS_SERVICE_NAME = "service.googleapis.com"; private static final ServerInfo LRS_SERVER_INFO = ServerInfo.create("api.google.com", InsecureChannelCredentials.create()); + private static final Metadata.Key ORCA_ENDPOINT_LOAD_METRICS_KEY = + Metadata.Key.of( + "endpoint-load-metrics-bin", + ProtoUtils.metadataMarshaller(OrcaLoadReport.getDefaultInstance())); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @Override @@ -255,7 +261,20 @@ public void recordLoadStats() { ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // second RPC call ClientStreamTracer streamTracer3 = result.getStreamTracerFactory().newClientStreamTracer( ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // third RPC call + // When the trailer contains an ORCA report, the listener callback will be invoked. + Metadata trailersWithOrcaLoadReport1 = new Metadata(); + trailersWithOrcaLoadReport1.put(ORCA_ENDPOINT_LOAD_METRICS_KEY, + OrcaLoadReport.newBuilder().setApplicationUtilization(1.414).setMemUtilization(0.034) + .setRpsFractional(1.414).putNamedMetrics("named1", 3.14159) + .putNamedMetrics("named2", -1.618).build()); + streamTracer1.inboundTrailers(trailersWithOrcaLoadReport1); streamTracer1.streamClosed(Status.OK); + Metadata trailersWithOrcaLoadReport2 = new Metadata(); + trailersWithOrcaLoadReport2.put(ORCA_ENDPOINT_LOAD_METRICS_KEY, + OrcaLoadReport.newBuilder().setApplicationUtilization(0.99).setMemUtilization(0.123) + .setRpsFractional(0.905).putNamedMetrics("named1", 2.718) + .putNamedMetrics("named3", 0.009).build()); + streamTracer2.inboundTrailers(trailersWithOrcaLoadReport2); streamTracer2.streamClosed(Status.UNAVAILABLE); ClusterStats clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER)); @@ -266,6 +285,24 @@ public void recordLoadStats() { assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(1L); assertThat(localityStats.totalErrorRequests()).isEqualTo(1L); assertThat(localityStats.totalRequestsInProgress()).isEqualTo(1L); + assertThat(localityStats.loadMetricStatsMap().containsKey("named1")).isTrue(); + assertThat( + localityStats.loadMetricStatsMap().get("named1").numRequestsFinishedWithMetric()).isEqualTo( + 2L); + assertThat(localityStats.loadMetricStatsMap().get("named1").totalMetricValue()).isEqualTo( + 3.14159 + 2.718); + assertThat(localityStats.loadMetricStatsMap().containsKey("named2")).isTrue(); + assertThat( + localityStats.loadMetricStatsMap().get("named2").numRequestsFinishedWithMetric()).isEqualTo( + 1L); + assertThat(localityStats.loadMetricStatsMap().get("named2").totalMetricValue()).isEqualTo( + -1.618); + assertThat(localityStats.loadMetricStatsMap().containsKey("named3")).isTrue(); + assertThat( + localityStats.loadMetricStatsMap().get("named3").numRequestsFinishedWithMetric()).isEqualTo( + 1L); + assertThat(localityStats.loadMetricStatsMap().get("named3").totalMetricValue()).isEqualTo( + 0.009); streamTracer3.streamClosed(Status.OK); subchannel.shutdown(); // stats recorder released @@ -278,6 +315,7 @@ public void recordLoadStats() { assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(1L); assertThat(localityStats.totalErrorRequests()).isEqualTo(0L); assertThat(localityStats.totalRequestsInProgress()).isEqualTo(0L); + assertThat(localityStats.loadMetricStatsMap().isEmpty()).isTrue(); clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER)); assertThat(clusterStats.upstreamLocalityStatsList()).isEmpty(); // no longer reported diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java index 3c3a11b3cd0..10716811439 100644 --- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java @@ -35,6 +35,7 @@ import com.google.protobuf.util.Durations; import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; +import io.envoyproxy.envoy.config.endpoint.v3.EndpointLoadMetricStats; import io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats; import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; @@ -198,11 +199,15 @@ private void addFakeStatsData() { for (int i = 0; i < 31; i++) { localityStats1.recordCallStarted(); } + localityStats1.recordBackendLoadMetricStats("named1", 3.14159); + localityStats1.recordBackendLoadMetricStats("named1", 1.618); + localityStats1.recordBackendLoadMetricStats("named1", -2.718); ClusterLocalityStats localityStats2 = loadStatsManager.getClusterLocalityStats(CLUSTER2, EDS_SERVICE_NAME2, LOCALITY2); for (int i = 0; i < 45; i++) { localityStats2.recordCallStarted(); } + localityStats2.recordBackendLoadMetricStats("named2", 1.414); localityStats2.recordCallFinished(Status.OK); } @@ -245,6 +250,12 @@ public void periodicLoadReporting() { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(31L); + assertThat(localityStats.getLoadMetricStatsCount()).isEqualTo(1); + EndpointLoadMetricStats loadMetricStats = Iterables.getOnlyElement( + localityStats.getLoadMetricStatsList()); + assertThat(loadMetricStats.getMetricName()).isEqualTo("named1"); + assertThat(loadMetricStats.getNumRequestsFinishedWithMetric()).isEqualTo(3L); + assertThat(loadMetricStats.getTotalMetricValue()).isEqualTo(3.14159 + 1.618 - 2.718); fakeClock.forwardTime(10L, TimeUnit.SECONDS); verify(requestObserver, times(3)).onNext(requestCaptor.capture()); @@ -263,6 +274,7 @@ public void periodicLoadReporting() { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(31L); + assertThat(localityStats.getLoadMetricStatsList()).isEmpty(); // Management server updates the interval of sending load reports, while still asking for // loads to cluster1 only. @@ -287,6 +299,7 @@ public void periodicLoadReporting() { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(31L); + assertThat(localityStats.getLoadMetricStatsList()).isEmpty(); // Management server asks to report loads for all clusters. responseObserver.onNext(LoadStatsResponse.newBuilder().setSendAllClusters(true) @@ -309,6 +322,7 @@ public void periodicLoadReporting() { assertThat(localityStats1.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats1.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats1.getTotalRequestsInProgress()).isEqualTo(31L); + assertThat(localityStats1.getLoadMetricStatsList()).isEmpty(); ClusterStats clusterStats2 = findClusterStats(request.getClusterStatsList(), CLUSTER2); assertThat(Durations.toSeconds(clusterStats2.getLoadReportInterval())) .isEqualTo(10L + 10L + 20L + 20L); @@ -326,6 +340,12 @@ public void periodicLoadReporting() { assertThat(localityStats2.getTotalSuccessfulRequests()).isEqualTo(1L); assertThat(localityStats2.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats2.getTotalRequestsInProgress()).isEqualTo(45L - 1L); + assertThat(localityStats2.getLoadMetricStatsCount()).isEqualTo(1); + EndpointLoadMetricStats loadMetricStats2 = Iterables.getOnlyElement( + localityStats2.getLoadMetricStatsList()); + assertThat(loadMetricStats2.getMetricName()).isEqualTo("named2"); + assertThat(loadMetricStats2.getNumRequestsFinishedWithMetric()).isEqualTo(1L); + assertThat(loadMetricStats2.getTotalMetricValue()).isEqualTo(1.414); // Load reports for cluster1 is no longer wanted. responseObserver.onNext(LoadStatsResponse.newBuilder().addClusters(CLUSTER2) @@ -348,6 +368,7 @@ public void periodicLoadReporting() { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(44L); + assertThat(localityStats.getLoadMetricStatsList()).isEmpty(); fakeClock.forwardTime(10L, TimeUnit.SECONDS); verify(requestObserver, times(7)).onNext(requestCaptor.capture()); @@ -366,6 +387,7 @@ public void periodicLoadReporting() { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(44L); + assertThat(localityStats.getLoadMetricStatsList()).isEmpty(); // Management server asks loads for a cluster that client has no load data. responseObserver.onNext(LoadStatsResponse.newBuilder().addClusters("unknown.googleapis.com") @@ -495,6 +517,12 @@ public void lrsStreamClosedAndRetried() { assertThat(localityStats.getTotalSuccessfulRequests()).isEqualTo(0L); assertThat(localityStats.getTotalErrorRequests()).isEqualTo(0L); assertThat(localityStats.getTotalRequestsInProgress()).isEqualTo(31L); + assertThat(localityStats.getLoadMetricStatsCount()).isEqualTo(1); + EndpointLoadMetricStats loadMetricStats = Iterables.getOnlyElement( + localityStats.getLoadMetricStatsList()); + assertThat(loadMetricStats.getMetricName()).isEqualTo("named1"); + assertThat(loadMetricStats.getNumRequestsFinishedWithMetric()).isEqualTo(3L); + assertThat(loadMetricStats.getTotalMetricValue()).isEqualTo(3.14159 + 1.618 - 2.718); // Wrapping up verify(backoffPolicyProvider, times(2)).get(); From 8eb497b548352db370f86bb8aedc188e9388d57a Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Fri, 30 Jun 2023 00:52:23 +0000 Subject: [PATCH 6/9] added comment for recordBackendLoadMetricStats() to perform increment/update or create entry atomically --- xds/src/main/java/io/grpc/xds/LoadStatsManager2.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java b/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java index 4dce0f9d51e..84715e8ad4f 100644 --- a/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java @@ -358,7 +358,11 @@ void recordCallFinished(Status status) { } /** - * Records a custom named backend load metric stat for per-call load reporting. + * Records a custom named backend load metric stat for per-call load reporting. For the metric + * key {@code name}, creates a new {@link BackendLoadMetricStats} with a finished requests + * counter of 1 and the {@code value} if the key is not present in the map. Otherwise, + * increments the finished requests counter and adds the {@code value} to the existing + * {@link BackendLoadMetricStats}. This entire method is performed atomically in one operation. */ void recordBackendLoadMetricStats(String name, double value) { loadMetricStatsMap.get().merge(name, BackendLoadMetricStats.create(1, value), From 7eb37343476b3cf38ac3d9ff819f9cc61a2b222d Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 6 Jul 2023 22:39:32 +0000 Subject: [PATCH 7/9] BackendLoadMetricStats mutable, recordBackendLoadMetricStats(Map) and snapshot() synchronized --- .../io/grpc/xds/ClusterImplLoadBalancer.java | 5 +-- .../java/io/grpc/xds/LoadStatsManager2.java | 39 +++++++++++-------- xds/src/main/java/io/grpc/xds/Stats.java | 30 ++++++++++---- .../io/grpc/xds/LoadReportClientTest.java | 8 ++-- .../io/grpc/xds/LoadStatsManager2Test.java | 23 ++++++----- 5 files changed, 60 insertions(+), 45 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index d61adf02ee4..b2be811d508 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -53,7 +53,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -407,9 +406,7 @@ private OrcaPerRpcListener(ClusterLocalityStats stats) { */ @Override public void onLoadReport(MetricReport report) { - for (Map.Entry entry : report.getNamedMetrics().entrySet()) { - stats.recordBackendLoadMetricStats(entry.getKey(), entry.getValue()); - } + stats.recordBackendLoadMetricStats(report.getNamedMetrics()); } } } diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java b/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java index 84715e8ad4f..e5320ef82cc 100644 --- a/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java @@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import io.grpc.Status; import io.grpc.xds.Stats.BackendLoadMetricStats; @@ -38,7 +39,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -324,8 +324,7 @@ final class ClusterLocalityStats { private final AtomicLong callsSucceeded = new AtomicLong(); private final AtomicLong callsFailed = new AtomicLong(); private final AtomicLong callsIssued = new AtomicLong(); - private final AtomicReference> - loadMetricStatsMap = new AtomicReference<>(new ConcurrentHashMap<>()); + private final Map loadMetricStatsMap = new HashMap<>(); private ClusterLocalityStats( String clusterName, @Nullable String edsServiceName, Locality locality, @@ -358,19 +357,20 @@ void recordCallFinished(Status status) { } /** - * Records a custom named backend load metric stat for per-call load reporting. For the metric - * key {@code name}, creates a new {@link BackendLoadMetricStats} with a finished requests - * counter of 1 and the {@code value} if the key is not present in the map. Otherwise, + * Records all custom named backend load metric stats for per-call load reporting. For each + * metric key {@code name}, creates a new {@link BackendLoadMetricStats} with a finished + * requests counter of 1 and the {@code value} if the key is not present in the map. Otherwise, * increments the finished requests counter and adds the {@code value} to the existing - * {@link BackendLoadMetricStats}. This entire method is performed atomically in one operation. + * {@link BackendLoadMetricStats}. */ - void recordBackendLoadMetricStats(String name, double value) { - loadMetricStatsMap.get().merge(name, BackendLoadMetricStats.create(1, value), - (oldBackendLoadMetricStats, newBackendLoadMetricStats) -> BackendLoadMetricStats.create( - oldBackendLoadMetricStats.numRequestsFinishedWithMetric() - + newBackendLoadMetricStats.numRequestsFinishedWithMetric(), - oldBackendLoadMetricStats.totalMetricValue() - + newBackendLoadMetricStats.totalMetricValue())); + synchronized void recordBackendLoadMetricStats(Map namedMetrics) { + namedMetrics.forEach((name, value) -> { + if (!loadMetricStatsMap.containsKey(name)) { + loadMetricStatsMap.put(name, new BackendLoadMetricStats(1, value)); + } else { + loadMetricStatsMap.get(name).addMetricValueAndIncrementRequestsFinished(value); + } + }); } /** @@ -387,9 +387,13 @@ void release() { private ClusterLocalityStatsSnapshot snapshot() { long duration = stopwatch.elapsed(TimeUnit.NANOSECONDS); stopwatch.reset().start(); + ImmutableMap loadMetricStatsMapCopy; + synchronized (this) { + loadMetricStatsMapCopy = ImmutableMap.copyOf(loadMetricStatsMap); + loadMetricStatsMap.clear(); + } return new ClusterLocalityStatsSnapshot(callsSucceeded.getAndSet(0), callsInProgress.get(), - callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration, - Collections.unmodifiableMap(loadMetricStatsMap.getAndSet(new ConcurrentHashMap<>()))); + callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration, loadMetricStatsMapCopy); } } @@ -409,7 +413,8 @@ private ClusterLocalityStatsSnapshot( this.callsFailed = callsFailed; this.callsIssued = callsIssued; this.durationNano = durationNano; - this.loadMetricStatsMap = checkNotNull(loadMetricStatsMap, "loadMetricStatsMap"); + this.loadMetricStatsMap = Collections.unmodifiableMap( + checkNotNull(loadMetricStatsMap, "loadMetricStatsMap")); } } } diff --git a/xds/src/main/java/io/grpc/xds/Stats.java b/xds/src/main/java/io/grpc/xds/Stats.java index 5606b0a4d49..5953f088448 100644 --- a/xds/src/main/java/io/grpc/xds/Stats.java +++ b/xds/src/main/java/io/grpc/xds/Stats.java @@ -117,17 +117,31 @@ static UpstreamLocalityStats create(Locality locality, long totalIssuedRequests, /** * Load metric stats for multi-dimensional load balancing. */ - @AutoValue - abstract static class BackendLoadMetricStats { + static final class BackendLoadMetricStats { + + private long numRequestsFinishedWithMetric; + private double totalMetricValue; + + BackendLoadMetricStats(long numRequestsFinishedWithMetric, double totalMetricValue) { + this.numRequestsFinishedWithMetric = numRequestsFinishedWithMetric; + this.totalMetricValue = totalMetricValue; + } - abstract long numRequestsFinishedWithMetric(); + public long numRequestsFinishedWithMetric() { + return numRequestsFinishedWithMetric; + } - abstract double totalMetricValue(); + public double totalMetricValue() { + return totalMetricValue; + } - static BackendLoadMetricStats create(long numRequestsFinishedWithMetric, - double totalMetricValue) { - return new AutoValue_Stats_BackendLoadMetricStats(numRequestsFinishedWithMetric, - totalMetricValue); + /** + * Adds the given {@code metricValue} and increments the number of requests finished counter for + * the existing {@link BackendLoadMetricStats}. + */ + public void addMetricValueAndIncrementRequestsFinished(double metricValue) { + numRequestsFinishedWithMetric += 1; + totalMetricValue += metricValue; } } } diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java index 10716811439..910a9fc3285 100644 --- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java @@ -199,15 +199,15 @@ private void addFakeStatsData() { for (int i = 0; i < 31; i++) { localityStats1.recordCallStarted(); } - localityStats1.recordBackendLoadMetricStats("named1", 3.14159); - localityStats1.recordBackendLoadMetricStats("named1", 1.618); - localityStats1.recordBackendLoadMetricStats("named1", -2.718); + localityStats1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 3.14159)); + localityStats1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 1.618)); + localityStats1.recordBackendLoadMetricStats(ImmutableMap.of("named1", -2.718)); ClusterLocalityStats localityStats2 = loadStatsManager.getClusterLocalityStats(CLUSTER2, EDS_SERVICE_NAME2, LOCALITY2); for (int i = 0; i < 45; i++) { localityStats2.recordCallStarted(); } - localityStats2.recordBackendLoadMetricStats("named2", 1.414); + localityStats2.recordBackendLoadMetricStats(ImmutableMap.of("named2", 1.414)); localityStats2.recordCallFinished(Status.OK); } diff --git a/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java b/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java index a86c330225e..0389fa74acc 100644 --- a/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java +++ b/xds/src/test/java/io/grpc/xds/LoadStatsManager2Test.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import io.grpc.Status; import io.grpc.internal.FakeClock; @@ -72,18 +73,17 @@ public void recordAndGetReport() { for (int i = 0; i < 19; i++) { loadCounter1.recordCallStarted(); } - loadCounter1.recordBackendLoadMetricStats("named1", 3.14159); - loadCounter1.recordBackendLoadMetricStats("named1", 1.618); - loadCounter1.recordBackendLoadMetricStats("named1", 99); - loadCounter1.recordBackendLoadMetricStats("named1", -97.23); - loadCounter1.recordBackendLoadMetricStats("named2", -2.718); + loadCounter1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 3.14159)); + loadCounter1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 1.618)); + loadCounter1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 99.0)); + loadCounter1.recordBackendLoadMetricStats(ImmutableMap.of("named1", -97.23, "named2", -2.718)); fakeClock.forwardTime(5L, TimeUnit.SECONDS); dropCounter2.recordDroppedRequest(); loadCounter1.recordCallFinished(Status.OK); for (int i = 0; i < 9; i++) { loadCounter2.recordCallStarted(); } - loadCounter2.recordBackendLoadMetricStats("named3", 0.0009); + loadCounter2.recordBackendLoadMetricStats(ImmutableMap.of("named3", 0.0009)); loadCounter2.recordCallFinished(Status.UNAVAILABLE); fakeClock.forwardTime(10L, TimeUnit.SECONDS); loadCounter3.recordCallStarted(); @@ -222,13 +222,12 @@ public void sharedLoadCounterStatsAggregation() { ClusterLocalityStats ref2 = loadStatsManager.getClusterLocalityStats( CLUSTER_NAME1, EDS_SERVICE_NAME1, LOCALITY1); ref1.recordCallStarted(); - ref1.recordBackendLoadMetricStats("named1", 1.618); - ref1.recordBackendLoadMetricStats("named1", 3.14159); + ref1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 1.618)); + ref1.recordBackendLoadMetricStats(ImmutableMap.of("named1", 3.14159)); ref1.recordCallFinished(Status.OK); ref2.recordCallStarted(); ref2.recordCallStarted(); - ref2.recordBackendLoadMetricStats("named1", -1); - ref2.recordBackendLoadMetricStats("named2", 2.718); + ref2.recordBackendLoadMetricStats(ImmutableMap.of("named1", -1.0, "named2", 2.718)); ref2.recordCallFinished(Status.UNAVAILABLE); ClusterStats stats = Iterables.getOnlyElement( @@ -259,8 +258,8 @@ public void loadCounterDelayedDeletionAfterAllInProgressRequestsReported() { CLUSTER_NAME1, EDS_SERVICE_NAME1, LOCALITY1); counter.recordCallStarted(); counter.recordCallStarted(); - counter.recordBackendLoadMetricStats("named1", 2.718); - counter.recordBackendLoadMetricStats("named1", 1.414); + counter.recordBackendLoadMetricStats(ImmutableMap.of("named1", 2.718)); + counter.recordBackendLoadMetricStats(ImmutableMap.of("named1", 1.414)); ClusterStats stats = Iterables.getOnlyElement( loadStatsManager.getClusterStatsReports(CLUSTER_NAME1)); From a6a584a3ba7d055754b0b23e3cc7319515cff807 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Fri, 7 Jul 2023 01:14:14 +0000 Subject: [PATCH 8/9] replace backend metrics map with new one in snapshot(), copy with unmodifiableMap() --- xds/src/main/java/io/grpc/xds/LoadStatsManager2.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java b/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java index e5320ef82cc..e51c6eceeb5 100644 --- a/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsManager2.java @@ -21,7 +21,6 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import io.grpc.Status; import io.grpc.xds.Stats.BackendLoadMetricStats; @@ -324,7 +323,7 @@ final class ClusterLocalityStats { private final AtomicLong callsSucceeded = new AtomicLong(); private final AtomicLong callsFailed = new AtomicLong(); private final AtomicLong callsIssued = new AtomicLong(); - private final Map loadMetricStatsMap = new HashMap<>(); + private Map loadMetricStatsMap = new HashMap<>(); private ClusterLocalityStats( String clusterName, @Nullable String edsServiceName, Locality locality, @@ -387,10 +386,10 @@ void release() { private ClusterLocalityStatsSnapshot snapshot() { long duration = stopwatch.elapsed(TimeUnit.NANOSECONDS); stopwatch.reset().start(); - ImmutableMap loadMetricStatsMapCopy; + Map loadMetricStatsMapCopy; synchronized (this) { - loadMetricStatsMapCopy = ImmutableMap.copyOf(loadMetricStatsMap); - loadMetricStatsMap.clear(); + loadMetricStatsMapCopy = Collections.unmodifiableMap(loadMetricStatsMap); + loadMetricStatsMap = new HashMap<>(); } return new ClusterLocalityStatsSnapshot(callsSucceeded.getAndSet(0), callsInProgress.get(), callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration, loadMetricStatsMapCopy); From 7bb2cbcbd05d0e849d5e1afb2b380d5489679923 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Tue, 11 Jul 2023 02:01:42 +0000 Subject: [PATCH 9/9] updated ClusterImplLoadBalancerTest with float tolerance equality --- .../io/grpc/xds/ClusterImplLoadBalancerTest.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index abe1529fe67..7842967c0a5 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -90,6 +90,7 @@ public class ClusterImplLoadBalancerTest { @Rule public final MockitoRule mocks = MockitoJUnit.rule(); + private static final double TOLERANCE = 1.0e-10; private static final String AUTHORITY = "api.google.com"; private static final String CLUSTER = "cluster-foo.googleapis.com"; private static final String EDS_SERVICE_NAME = "service.googleapis.com"; @@ -273,6 +274,7 @@ public void recordLoadStats() { trailersWithOrcaLoadReport2.put(ORCA_ENDPOINT_LOAD_METRICS_KEY, OrcaLoadReport.newBuilder().setApplicationUtilization(0.99).setMemUtilization(0.123) .setRpsFractional(0.905).putNamedMetrics("named1", 2.718) + .putNamedMetrics("named2", 1.414) .putNamedMetrics("named3", 0.009).build()); streamTracer2.inboundTrailers(trailersWithOrcaLoadReport2); streamTracer2.streamClosed(Status.UNAVAILABLE); @@ -289,20 +291,20 @@ public void recordLoadStats() { assertThat( localityStats.loadMetricStatsMap().get("named1").numRequestsFinishedWithMetric()).isEqualTo( 2L); - assertThat(localityStats.loadMetricStatsMap().get("named1").totalMetricValue()).isEqualTo( - 3.14159 + 2.718); + assertThat(localityStats.loadMetricStatsMap().get("named1").totalMetricValue()).isWithin( + TOLERANCE).of(3.14159 + 2.718); assertThat(localityStats.loadMetricStatsMap().containsKey("named2")).isTrue(); assertThat( localityStats.loadMetricStatsMap().get("named2").numRequestsFinishedWithMetric()).isEqualTo( - 1L); - assertThat(localityStats.loadMetricStatsMap().get("named2").totalMetricValue()).isEqualTo( - -1.618); + 2L); + assertThat(localityStats.loadMetricStatsMap().get("named2").totalMetricValue()).isWithin( + TOLERANCE).of(-1.618 + 1.414); assertThat(localityStats.loadMetricStatsMap().containsKey("named3")).isTrue(); assertThat( localityStats.loadMetricStatsMap().get("named3").numRequestsFinishedWithMetric()).isEqualTo( 1L); - assertThat(localityStats.loadMetricStatsMap().get("named3").totalMetricValue()).isEqualTo( - 0.009); + assertThat(localityStats.loadMetricStatsMap().get("named3").totalMetricValue()).isWithin( + TOLERANCE).of(0.009); streamTracer3.streamClosed(Status.OK); subchannel.shutdown(); // stats recorder released