diff --git a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java index 1ce97e587b3fe..249a5d096d96c 100644 --- a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java +++ b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java @@ -19,6 +19,8 @@ package org.apache.druid.data.input.opencensus.protobuf; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -42,6 +44,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -80,7 +83,17 @@ private interface LabelContext @Override public CloseableIterator read() { - return CloseableIterators.withEmptyBaggage(readAsList().iterator()); + Supplier> supplier = Suppliers.memoize(() -> readAsList().iterator()); + return CloseableIterators.withEmptyBaggage(new Iterator() { + @Override + public boolean hasNext() { + return supplier.get().hasNext(); + } + @Override + public InputRow next() { + return supplier.get().next(); + } + }); } List readAsList() diff --git a/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenTelemetryMetricsProtobufReaderTest.java b/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReaderTest.java similarity index 93% rename from extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenTelemetryMetricsProtobufReaderTest.java rename to extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReaderTest.java index 0c8902df560b6..43ef6d455d85a 100644 --- a/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenTelemetryMetricsProtobufReaderTest.java +++ b/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReaderTest.java @@ -32,6 +32,7 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; @@ -52,7 +53,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; -public class OpenTelemetryMetricsProtobufReaderTest +public class OpenCensusProtobufReaderTest { private static final long TIMESTAMP = TimeUnit.MILLISECONDS.toNanos(Instant.parse("2019-07-12T09:30:01.123Z").toEpochMilli()); public static final String RESOURCE_ATTRIBUTE_COUNTRY = "country"; @@ -325,6 +326,27 @@ public void testDimensionSpecExclusions() throws IOException Assert.assertFalse(row.getDimensions().contains("descriptor.color")); } + @Test + public void testInvalidProtobuf() throws IOException { + byte[] invalidProtobuf = new byte[] { 0x00, 0x01 }; + ConsumerRecord consumerRecord = new ConsumerRecord(TOPIC, PARTITION, OFFSET, TS, TSTYPE, + -1L, -1, -1, null, invalidProtobuf, HEADERS); + KafkaRecordEntity kafkaRecordEntity = new KafkaRecordEntity(consumerRecord); + OpenCensusProtobufInputFormat inputFormat = new OpenCensusProtobufInputFormat("metric.name", + null, + "descriptor.", + "custom."); + + CloseableIterator rows = inputFormat.createReader(new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + dimensionsSpec, + ColumnsFilter.all() + ), kafkaRecordEntity, null).read(); + + Assert.assertThrows(ParseException.class, () -> rows.hasNext()); + Assert.assertThrows(ParseException.class, () -> rows.next()); + } + private void assertDimensionEquals(InputRow row, String dimension, Object expected) { List values = row.getDimension(dimension); diff --git a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java index 20f9cb533891a..f5d191cbee372 100644 --- a/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java +++ b/extensions-contrib/opentelemetry-extensions/src/main/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReader.java @@ -19,6 +19,8 @@ package org.apache.druid.data.input.opentelemetry.protobuf; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.protobuf.InvalidProtocolBufferException; @@ -40,6 +42,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -47,7 +50,6 @@ public class OpenTelemetryMetricsProtobufReader implements InputEntityReader { - private final ByteEntity source; private final String metricDimension; private final String valueDimension; @@ -75,7 +77,17 @@ public OpenTelemetryMetricsProtobufReader( @Override public CloseableIterator read() { - return CloseableIterators.withEmptyBaggage(readAsList().iterator()); + Supplier> supplier = Suppliers.memoize(() -> readAsList().iterator()); + return CloseableIterators.withEmptyBaggage(new Iterator() { + @Override + public boolean hasNext() { + return supplier.get().hasNext(); + } + @Override + public InputRow next() { + return supplier.get().next(); + } + }); } List readAsList() diff --git a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java index a4d85ece1d79d..8e36a0b4fc35e 100644 --- a/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java +++ b/extensions-contrib/opentelemetry-extensions/src/test/java/org/apache/druid/data/input/opentelemetry/protobuf/OpenTelemetryMetricsProtobufReaderTest.java @@ -30,6 +30,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.ParseException; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -342,6 +343,21 @@ public void testUnsupportedValueTypes() assertDimensionEquals(row, "raw.value", "6"); } + @Test + public void testInvalidProtobuf() { + byte[] invalidProtobuf = new byte[] { 0x00, 0x01 }; + CloseableIterator rows = new OpenTelemetryMetricsProtobufReader( + dimensionsSpec, + new ByteEntity(invalidProtobuf), + "metric.name", + "raw.value", + "descriptor.", + "custom." + ).read(); + Assert.assertThrows(ParseException.class, () -> rows.hasNext()); + Assert.assertThrows(ParseException.class, () -> rows.next()); + } + private void assertDimensionEquals(InputRow row, String dimension, Object expected) { List values = row.getDimension(dimension);