From cc0c452be94a1579dff9920a6b94f3c10822132e Mon Sep 17 00:00:00 2001 From: Michael Li Date: Wed, 27 Sep 2023 10:12:58 -0700 Subject: [PATCH] Filter Out Metrics with NoRecordedValue Flag Set (#157) Metrics that contain the NoRecordedValue Flag are being written to Druid with a 0 value. We should properly handle them in the backend --- .../OpenTelemetryMetricsProtobufReader.java | 18 +++++++++++-- ...penTelemetryMetricsProtobufReaderTest.java | 27 +++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java index c5fa65ca28e2..2e0a62532b5d 100644 --- a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java +++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java @@ -25,6 +25,7 @@ import com.google.common.collect.Sets; import com.google.protobuf.InvalidProtocolBufferException; import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.metrics.v1.DataPointFlags; import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.MetricsData; import io.opentelemetry.proto.metrics.v1.NumberDataPoint; @@ -146,14 +147,22 @@ private List parseMetric(Metric metric, Map resourceAt inputRows = new ArrayList<>(metric.getSum().getDataPointsCount()); metric.getSum() .getDataPointsList() - .forEach(dataPoint -> inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName))); + .forEach(dataPoint -> { + if (hasRecordedValue(dataPoint)) { + inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName)); + } + }); break; } case GAUGE: { inputRows = new ArrayList<>(metric.getGauge().getDataPointsCount()); metric.getGauge() .getDataPointsList() - .forEach(dataPoint -> inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName))); + .forEach(dataPoint -> { + if (hasRecordedValue(dataPoint)) { + inputRows.add(parseNumberDataPoint(dataPoint, resourceAttributes, metricName)); + } + }); break; } // TODO Support HISTOGRAM and SUMMARY metrics @@ -167,6 +176,11 @@ private List parseMetric(Metric metric, Map resourceAt return inputRows; } + private static boolean hasRecordedValue(NumberDataPoint d) + { + return (d.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) == 0; + } + private InputRow parseNumberDataPoint(NumberDataPoint dataPoint, Map resourceAttributes, String metricName) diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java index 70c60bd00dd2..8044baf7dbe0 100644 --- a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java +++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java @@ -23,6 +23,7 @@ import io.opentelemetry.proto.common.v1.AnyValue; import io.opentelemetry.proto.common.v1.KeyValue; import io.opentelemetry.proto.common.v1.KeyValueList; +import io.opentelemetry.proto.metrics.v1.DataPointFlags; import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.MetricsData; import org.apache.druid.data.input.InputRow; @@ -404,6 +405,32 @@ public void testInvalidMetricType() Assert.assertEquals(0, rowList.size()); } + @Test + public void testNoRecordedValueMetric() + { + metricBuilder.setName("example_gauge") + .getGaugeBuilder() + .addDataPointsBuilder() + .setAsInt(6) + .setFlags(DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) + .setTimeUnixNano(TIMESTAMP); + + MetricsData metricsData = metricsDataBuilder.build(); + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + settableByteEntity.setEntity(new ByteEntity(metricsData.toByteArray())); + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + settableByteEntity, + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + + Assert.assertFalse(rows.hasNext()); + } + private void assertDimensionEquals(InputRow row, String dimension, Object expected) { List values = row.getDimension(dimension);