Skip to content

Commit

Permalink
[OBSDATA-334] Patch opencensus/opentelemetry parse exception (apache#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingParsley authored and kkonstantine committed Oct 7, 2022
1 parent 4922efd commit a6980e8
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.data.input.opencensus.protobuf;

import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand All @@ -42,6 +44,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -80,7 +83,17 @@ private interface LabelContext
@Override
public CloseableIterator<InputRow> read()
{
return CloseableIterators.withEmptyBaggage(readAsList().iterator());
Supplier<Iterator<InputRow>> supplier = Suppliers.memoize(() -> readAsList().iterator());
return CloseableIterators.withEmptyBaggage(new Iterator<InputRow>() {
@Override
public boolean hasNext() {
return supplier.get().hasNext();
}
@Override
public InputRow next() {
return supplier.get().next();
}
});
}

List<InputRow> readAsList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
Expand All @@ -52,7 +53,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

public class OpenTelemetryMetricsProtobufReaderTest
public class OpenCensusProtobufReaderTest
{
private static final long TIMESTAMP = TimeUnit.MILLISECONDS.toNanos(Instant.parse("2019-07-12T09:30:01.123Z").toEpochMilli());
public static final String RESOURCE_ATTRIBUTE_COUNTRY = "country";
Expand Down Expand Up @@ -325,6 +326,27 @@ public void testDimensionSpecExclusions() throws IOException
Assert.assertFalse(row.getDimensions().contains("descriptor.color"));
}

@Test
public void testInvalidProtobuf() throws IOException {
byte[] invalidProtobuf = new byte[] { 0x00, 0x01 };
ConsumerRecord consumerRecord = new ConsumerRecord(TOPIC, PARTITION, OFFSET, TS, TSTYPE,
-1L, -1, -1, null, invalidProtobuf, HEADERS);
KafkaRecordEntity kafkaRecordEntity = new KafkaRecordEntity(consumerRecord);
OpenCensusProtobufInputFormat inputFormat = new OpenCensusProtobufInputFormat("metric.name",
null,
"descriptor.",
"custom.");

CloseableIterator<InputRow> rows = inputFormat.createReader(new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
dimensionsSpec,
ColumnsFilter.all()
), kafkaRecordEntity, null).read();

Assert.assertThrows(ParseException.class, () -> rows.hasNext());
Assert.assertThrows(ParseException.class, () -> rows.next());
}

private void assertDimensionEquals(InputRow row, String dimension, Object expected)
{
List<String> values = row.getDimension(dimension);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.data.input.opentelemetry.protobuf;

import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
Expand All @@ -40,14 +42,14 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class OpenTelemetryMetricsProtobufReader implements InputEntityReader
{

private final ByteEntity source;
private final String metricDimension;
private final String valueDimension;
Expand Down Expand Up @@ -75,7 +77,17 @@ public OpenTelemetryMetricsProtobufReader(
@Override
public CloseableIterator<InputRow> read()
{
return CloseableIterators.withEmptyBaggage(readAsList().iterator());
Supplier<Iterator<InputRow>> supplier = Suppliers.memoize(() -> readAsList().iterator());
return CloseableIterators.withEmptyBaggage(new Iterator<InputRow>() {
@Override
public boolean hasNext() {
return supplier.get().hasNext();
}
@Override
public InputRow next() {
return supplier.get().next();
}
});
}

List<InputRow> readAsList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -342,6 +343,21 @@ public void testUnsupportedValueTypes()
assertDimensionEquals(row, "raw.value", "6");
}

@Test
public void testInvalidProtobuf() {
byte[] invalidProtobuf = new byte[] { 0x00, 0x01 };
CloseableIterator<InputRow> rows = new OpenTelemetryMetricsProtobufReader(
dimensionsSpec,
new ByteEntity(invalidProtobuf),
"metric.name",
"raw.value",
"descriptor.",
"custom."
).read();
Assert.assertThrows(ParseException.class, () -> rows.hasNext());
Assert.assertThrows(ParseException.class, () -> rows.next());
}

private void assertDimensionEquals(InputRow row, String dimension, Object expected)
{
List<String> values = row.getDimension(dimension);
Expand Down

0 comments on commit a6980e8

Please sign in to comment.