From fcb5a040318dd47287a3d49cd9e414ed3d5e0708 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 5 Jan 2021 12:21:38 -0800 Subject: [PATCH] deprecate OpenCensusInputRowParser in favor of OpenCensusProtobufInputFormat (#26) InputRowParsers have been deprecated in favor or InputFormat. This implements the InputFormat version of the OpenCensus Protobuf parser, and deprecates the existing InputRowParser implementation. - the existing InputRowParser behavior is unchanged. - the InputFormat behaves like the InputRowParser, except for the default resource prefix which now defaults to "resource." instead of empty. - both implementations internally delegate to OpenCensusProtobufReader, which is covered by the existing InputRowParser tests. --- .../OpenCensusProtobufExtensionsModule.java | 3 +- .../OpenCensusProtobufInputFormat.java | 109 +++++++++ .../OpenCensusProtobufInputRowParser.java | 152 +------------ .../protobuf/OpenCensusProtobufReader.java | 209 ++++++++++++++++++ .../protobuf/OpenCensusInputFormatTest.java | 56 +++++ .../OpenCensusProtobufInputRowParserTest.java | 138 ++++++------ 6 files changed, 464 insertions(+), 203 deletions(-) create mode 100644 extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputFormat.java create mode 100644 extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java create mode 100644 extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusInputFormatTest.java diff --git a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufExtensionsModule.java b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufExtensionsModule.java index 39576e4629d0..66a58c0eb28e 100644 --- a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufExtensionsModule.java +++ b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufExtensionsModule.java @@ -37,7 +37,8 @@ public List getJacksonModules() return Collections.singletonList( new SimpleModule("OpenCensusProtobufInputRowParserModule") .registerSubtypes( - new NamedType(OpenCensusProtobufInputRowParser.class, "opencensus-protobuf") + new NamedType(OpenCensusProtobufInputRowParser.class, "opencensus-protobuf"), + new NamedType(OpenCensusProtobufInputFormat.class, "opencensus-protobuf") ) ); } diff --git a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputFormat.java b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputFormat.java new file mode 100644 index 000000000000..2676a818922a --- /dev/null +++ b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputFormat.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opencensus.protobuf; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.java.util.common.StringUtils; + +import java.io.File; +import java.util.Objects; + +public class OpenCensusProtobufInputFormat implements InputFormat +{ + private static final String DEFAULT_METRIC_DIMENSION = "name"; + private static final String DEFAULT_RESOURCE_PREFIX = "resource."; + + private final String metricDimension; + private final String metricLabelPrefix; + private final String resourceLabelPrefix; + + public OpenCensusProtobufInputFormat( + @JsonProperty("metricDimension") String metricDimension, + @JsonProperty("metricLabelPrefix") String metricLabelPrefix, + @JsonProperty("resourceLabelPrefix") String resourceLabelPrefix + ) + { + this.metricDimension = metricDimension != null ? metricDimension : DEFAULT_METRIC_DIMENSION; + this.metricLabelPrefix = StringUtils.nullToEmptyNonDruidDataString(metricLabelPrefix); + this.resourceLabelPrefix = resourceLabelPrefix != null ? resourceLabelPrefix : DEFAULT_RESOURCE_PREFIX; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + return new OpenCensusProtobufReader( + inputRowSchema.getDimensionsSpec(), + (ByteEntity) source, + metricDimension, + metricLabelPrefix, + resourceLabelPrefix + ); + } + + @JsonProperty + public String getMetricDimension() + { + return metricDimension; + } + + @JsonProperty + public String getMetricLabelPrefix() + { + return metricLabelPrefix; + } + + @JsonProperty + public String getResourceLabelPrefix() + { + return resourceLabelPrefix; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof OpenCensusProtobufInputFormat)) { + return false; + } + OpenCensusProtobufInputFormat that = (OpenCensusProtobufInputFormat) o; + return Objects.equals(metricDimension, that.metricDimension) + && Objects.equals(metricLabelPrefix, that.metricLabelPrefix) + && Objects.equals(resourceLabelPrefix, that.resourceLabelPrefix); + } + + @Override + public int hashCode() + { + return Objects.hash(metricDimension, metricLabelPrefix, resourceLabelPrefix); + } +} diff --git a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParser.java b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParser.java index 087ec296c212..227a6ea5fb8b 100644 --- a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParser.java +++ b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParser.java @@ -22,45 +22,29 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Timestamp; -import io.opencensus.proto.metrics.v1.LabelKey; -import io.opencensus.proto.metrics.v1.Metric; -import io.opencensus.proto.metrics.v1.Point; -import io.opencensus.proto.metrics.v1.TimeSeries; import org.apache.druid.data.input.ByteBufferInputRowParser; import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.MapBasedInputRow; -import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.utils.CollectionUtils; import java.nio.ByteBuffer; -import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.Set; +/** + * use {@link OpenCensusProtobufInputFormat} instead + */ +@Deprecated public class OpenCensusProtobufInputRowParser implements ByteBufferInputRowParser { private static final Logger LOG = new Logger(OpenCensusProtobufInputRowParser.class); - private static final String SEPARATOR = "-"; - private static final String VALUE_COLUMN = "value"; private static final String DEFAULT_METRIC_DIMENSION = "name"; private static final String DEFAULT_RESOURCE_PREFIX = ""; private final ParseSpec parseSpec; - private final DimensionsSpec dimensionsSpec; private final String metricDimension; private final String metricLabelPrefix; @@ -75,7 +59,6 @@ public OpenCensusProtobufInputRowParser( ) { this.parseSpec = parseSpec; - this.dimensionsSpec = parseSpec.getDimensionsSpec(); this.metricDimension = Strings.isNullOrEmpty(metricDimension) ? DEFAULT_METRIC_DIMENSION : metricDimension; this.metricLabelPrefix = StringUtils.nullToEmptyNonDruidDataString(metricPrefix); this.resourceLabelPrefix = resourcePrefix != null ? resourcePrefix : DEFAULT_RESOURCE_PREFIX; @@ -117,127 +100,16 @@ public OpenCensusProtobufInputRowParser withParseSpec(ParseSpec parseSpec) resourceLabelPrefix); } - - private interface LabelContext - { - void addRow(long millis, String metricName, Object value); - } - @Override public List parseBatch(ByteBuffer input) { - final Metric metric; - try { - metric = Metric.parseFrom(input); - } - catch (InvalidProtocolBufferException e) { - throw new ParseException(e, "Protobuf message could not be parsed"); - } - - // Process metric descriptor labels map keys. - List descriptorLabels = new ArrayList<>(metric.getMetricDescriptor().getLabelKeysCount()); - for (LabelKey s : metric.getMetricDescriptor().getLabelKeysList()) { - descriptorLabels.add(this.metricLabelPrefix + s.getKey()); - } - - // Process resource labels map. - Map resourceLabelsMap = CollectionUtils.mapKeys( - metric.getResource().getLabelsMap(), - key -> this.resourceLabelPrefix + key - ); - - final List schemaDimensions = dimensionsSpec.getDimensionNames(); - - final List dimensions; - if (!schemaDimensions.isEmpty()) { - dimensions = schemaDimensions; - } else { - Set recordDimensions = new HashSet<>(descriptorLabels); - - // Add resource map key set to record dimensions. - recordDimensions.addAll(resourceLabelsMap.keySet()); - - // MetricDimension, VALUE dimensions will not be present in labelKeysList or Metric.Resource - // map as they are derived dimensions, which get populated while parsing data for timeSeries - // hence add them to recordDimensions. - recordDimensions.add(metricDimension); - recordDimensions.add(VALUE_COLUMN); - - dimensions = Lists.newArrayList( - Sets.difference(recordDimensions, dimensionsSpec.getDimensionExclusions()) - ); - } - - final int capacity = resourceLabelsMap.size() - + descriptorLabels.size() - + 2; // metric name + value columns - - List rows = new ArrayList<>(); - for (TimeSeries ts : metric.getTimeseriesList()) { - final LabelContext labelContext = (millis, metricName, value) -> { - // Add common resourceLabels. - Map event = new HashMap<>(capacity); - event.putAll(resourceLabelsMap); - // Add metric labels - for (int i = 0; i < metric.getMetricDescriptor().getLabelKeysCount(); i++) { - event.put(descriptorLabels.get(i), ts.getLabelValues(i).getValue()); - } - // add metric name and value - event.put(metricDimension, metricName); - event.put(VALUE_COLUMN, value); - rows.add(new MapBasedInputRow(millis, dimensions, event)); - }; - - for (Point point : ts.getPointsList()) { - addPointRows(point, metric, labelContext); - } - } - return rows; - } - - private void addPointRows(Point point, Metric metric, LabelContext labelContext) - { - Timestamp timestamp = point.getTimestamp(); - long millis = Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); - String metricName = metric.getMetricDescriptor().getName(); - - switch (point.getValueCase()) { - case DOUBLE_VALUE: - labelContext.addRow(millis, metricName, point.getDoubleValue()); - break; - - case INT64_VALUE: - labelContext.addRow(millis, metricName, point.getInt64Value()); - break; - - case SUMMARY_VALUE: - // count - labelContext.addRow( - millis, - metricName + SEPARATOR + "count", - point.getSummaryValue().getCount().getValue() - ); - // sum - labelContext.addRow( - millis, - metricName + SEPARATOR + "sum", - point.getSummaryValue().getSnapshot().getSum().getValue() - ); - break; - - // TODO : How to handle buckets and percentiles - case DISTRIBUTION_VALUE: - // count - labelContext.addRow(millis, metricName + SEPARATOR + "count", point.getDistributionValue().getCount()); - // sum - labelContext.addRow( - millis, - metricName + SEPARATOR + "sum", - point.getDistributionValue().getSum() - ); - break; - default: - } + return new OpenCensusProtobufReader( + parseSpec.getDimensionsSpec(), + new ByteEntity(input), + metricDimension, + metricLabelPrefix, + resourceLabelPrefix + ).readAsList(); } @Override diff --git a/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java new file mode 100644 index 000000000000..9830defeaedb --- /dev/null +++ b/extensions-contrib/opencensus-extensions/src/main/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufReader.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opencensus.protobuf; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Timestamp; +import io.opencensus.proto.metrics.v1.LabelKey; +import io.opencensus.proto.metrics.v1.Metric; +import io.opencensus.proto.metrics.v1.Point; +import io.opencensus.proto.metrics.v1.TimeSeries; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.CollectionUtils; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class OpenCensusProtobufReader implements InputEntityReader +{ + private static final String SEPARATOR = "-"; + private static final String VALUE_COLUMN = "value"; + + private final DimensionsSpec dimensionsSpec; + private final ByteEntity source; + private final String metricDimension; + private final String metricLabelPrefix; + private final String resourceLabelPrefix; + + public OpenCensusProtobufReader( + DimensionsSpec dimensionsSpec, + ByteEntity source, + String metricDimension, + String metricLabelPrefix, + String resourceLabelPrefix + ) + { + this.dimensionsSpec = dimensionsSpec; + this.source = source; + this.metricDimension = metricDimension; + this.metricLabelPrefix = metricLabelPrefix; + this.resourceLabelPrefix = resourceLabelPrefix; + } + + private interface LabelContext + { + void addRow(long millis, String metricName, Object value); + } + + @Override + public CloseableIterator read() + { + return CloseableIterators.withEmptyBaggage(readAsList().iterator()); + } + + List readAsList() + { + try { + return parseMetric(Metric.parseFrom(source.getBuffer())); + } + catch (InvalidProtocolBufferException e) { + throw new ParseException(e, "Protobuf message could not be parsed"); + } + } + + private List parseMetric(final Metric metric) + { + // Process metric descriptor labels map keys. + List descriptorLabels = new ArrayList<>(metric.getMetricDescriptor().getLabelKeysCount()); + for (LabelKey s : metric.getMetricDescriptor().getLabelKeysList()) { + descriptorLabels.add(this.metricLabelPrefix + s.getKey()); + } + + // Process resource labels map. + Map resourceLabelsMap = CollectionUtils.mapKeys( + metric.getResource().getLabelsMap(), + key -> this.resourceLabelPrefix + key + ); + + final List schemaDimensions = dimensionsSpec.getDimensionNames(); + + final List dimensions; + if (!schemaDimensions.isEmpty()) { + dimensions = schemaDimensions; + } else { + Set recordDimensions = new HashSet<>(descriptorLabels); + + // Add resource map key set to record dimensions. + recordDimensions.addAll(resourceLabelsMap.keySet()); + + // MetricDimension, VALUE dimensions will not be present in labelKeysList or Metric.Resource + // map as they are derived dimensions, which get populated while parsing data for timeSeries + // hence add them to recordDimensions. + recordDimensions.add(metricDimension); + recordDimensions.add(VALUE_COLUMN); + + dimensions = Lists.newArrayList( + Sets.difference(recordDimensions, dimensionsSpec.getDimensionExclusions()) + ); + } + + final int capacity = resourceLabelsMap.size() + + descriptorLabels.size() + + 2; // metric name + value columns + + List rows = new ArrayList<>(); + for (TimeSeries ts : metric.getTimeseriesList()) { + final LabelContext labelContext = (millis, metricName, value) -> { + // Add common resourceLabels. + Map event = new HashMap<>(capacity); + event.putAll(resourceLabelsMap); + // Add metric labels + for (int i = 0; i < metric.getMetricDescriptor().getLabelKeysCount(); i++) { + event.put(descriptorLabels.get(i), ts.getLabelValues(i).getValue()); + } + // add metric name and value + event.put(metricDimension, metricName); + event.put(VALUE_COLUMN, value); + rows.add(new MapBasedInputRow(millis, dimensions, event)); + }; + + for (Point point : ts.getPointsList()) { + addPointRows(point, metric, labelContext); + } + } + return rows; + } + + private void addPointRows(Point point, Metric metric, LabelContext labelContext) + { + Timestamp timestamp = point.getTimestamp(); + long millis = Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli(); + String metricName = metric.getMetricDescriptor().getName(); + + switch (point.getValueCase()) { + case DOUBLE_VALUE: + labelContext.addRow(millis, metricName, point.getDoubleValue()); + break; + + case INT64_VALUE: + labelContext.addRow(millis, metricName, point.getInt64Value()); + break; + + case SUMMARY_VALUE: + // count + labelContext.addRow( + millis, + metricName + SEPARATOR + "count", + point.getSummaryValue().getCount().getValue() + ); + // sum + labelContext.addRow( + millis, + metricName + SEPARATOR + "sum", + point.getSummaryValue().getSnapshot().getSum().getValue() + ); + break; + + // TODO : How to handle buckets and percentiles + case DISTRIBUTION_VALUE: + // count + labelContext.addRow(millis, metricName + SEPARATOR + "count", point.getDistributionValue().getCount()); + // sum + labelContext.addRow( + millis, + metricName + SEPARATOR + "sum", + point.getDistributionValue().getSum() + ); + break; + default: + } + } + + @Override + public CloseableIterator sample() + { + return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent())); + } +} diff --git a/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusInputFormatTest.java b/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusInputFormatTest.java new file mode 100644 index 000000000000..2ee056b67448 --- /dev/null +++ b/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusInputFormatTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.opencensus.protobuf; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputFormat; +import org.junit.Assert; +import org.junit.Test; + +public class OpenCensusInputFormatTest +{ + @Test + public void testSerde() throws Exception + { + OpenCensusProtobufInputFormat inputFormat = new OpenCensusProtobufInputFormat("metric.name", "descriptor.", "custom."); + + final ObjectMapper jsonMapper = new ObjectMapper(); + jsonMapper.registerModules(new OpenCensusProtobufExtensionsModule().getJacksonModules()); + + final OpenCensusProtobufInputFormat actual = (OpenCensusProtobufInputFormat) jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + InputFormat.class + ); + Assert.assertEquals(inputFormat, actual); + Assert.assertEquals("metric.name", actual.getMetricDimension()); + Assert.assertEquals("descriptor.", actual.getMetricLabelPrefix()); + Assert.assertEquals("custom.", actual.getResourceLabelPrefix()); + } + + @Test + public void testDefaults() + { + OpenCensusProtobufInputFormat inputFormat = new OpenCensusProtobufInputFormat(null, null, null); + + Assert.assertEquals("name", inputFormat.getMetricDimension()); + Assert.assertEquals("", inputFormat.getMetricLabelPrefix()); + Assert.assertEquals("resource.", inputFormat.getResourceLabelPrefix()); + } +} diff --git a/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParserTest.java b/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParserTest.java index 503dc754a3cc..2e0266e7dd9b 100644 --- a/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParserTest.java +++ b/extensions-contrib/opencensus-extensions/src/test/java/org/apache/druid/data/input/opencensus/protobuf/OpenCensusProtobufInputRowParserTest.java @@ -35,18 +35,16 @@ import io.opencensus.proto.metrics.v1.SummaryValue; import io.opencensus.proto.metrics.v1.TimeSeries; import io.opencensus.proto.resource.v1.Resource; -import org.apache.druid.data.input.ByteBufferInputRowParser; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.JSONParseSpec; -import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.junit.Assert; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -62,51 +60,81 @@ public class OpenCensusProtobufInputRowParserTest private static final Timestamp TIMESTAMP = Timestamp.newBuilder() .setSeconds(INSTANT.getEpochSecond()) .setNanos(INSTANT.getNano()).build(); - @Rule - public ExpectedException expectedException = ExpectedException.none(); - private ParseSpec parseSpec; + static final JSONParseSpec PARSE_SPEC = new JSONParseSpec( + new TimestampSpec("timestamp", "millis", null), + new DimensionsSpec(null, null, null), + new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "name", ""), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "value", ""), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "foo_key", "") + ) + ), null, null + ); + + static final JSONParseSpec PARSE_SPEC_WITH_DIMENSIONS = new JSONParseSpec( + new TimestampSpec("timestamp", "millis", null), + new DimensionsSpec(ImmutableList.of( + new StringDimensionSchema("foo_key"), + new StringDimensionSchema("env_key") + ), null, null), + new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "name", ""), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "value", ""), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "foo_key", "") + ) + ), null, null + ); - private ParseSpec parseSpecWithDimensions; + @Rule + public ExpectedException expectedException = ExpectedException.none(); - @Before - public void setUp() + @Test + public void testSerde() throws Exception { - parseSpec = new JSONParseSpec( - new TimestampSpec("timestamp", "millis", null), - new DimensionsSpec(null, null, null), - new JSONPathSpec( - true, - Lists.newArrayList( - new JSONPathFieldSpec(JSONPathFieldType.ROOT, "name", ""), - new JSONPathFieldSpec(JSONPathFieldType.ROOT, "value", ""), - new JSONPathFieldSpec(JSONPathFieldType.ROOT, "foo_key", "") - ) - ), null, null + OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser( + OpenCensusProtobufInputRowParserTest.PARSE_SPEC, + "metric.name", + "descriptor.", + "custom." ); - parseSpecWithDimensions = new JSONParseSpec( - new TimestampSpec("timestamp", "millis", null), - new DimensionsSpec(ImmutableList.of( - new StringDimensionSchema("foo_key"), - new StringDimensionSchema("env_key")), null, null), - new JSONPathSpec( - true, - Lists.newArrayList( - new JSONPathFieldSpec(JSONPathFieldType.ROOT, "name", ""), - new JSONPathFieldSpec(JSONPathFieldType.ROOT, "value", ""), - new JSONPathFieldSpec(JSONPathFieldType.ROOT, "foo_key", "") - ) - ), null, null + final ObjectMapper jsonMapper = new ObjectMapper(); + jsonMapper.registerModules(new OpenCensusProtobufExtensionsModule().getJacksonModules()); + + final OpenCensusProtobufInputRowParser actual = (OpenCensusProtobufInputRowParser) jsonMapper.readValue( + jsonMapper.writeValueAsString(parser), + InputRowParser.class ); + Assert.assertEquals(parser, actual); + Assert.assertEquals("metric.name", actual.getMetricDimension()); + Assert.assertEquals("descriptor.", actual.getMetricLabelPrefix()); + Assert.assertEquals("custom.", actual.getResourceLabelPrefix()); } + @Test + public void testDefaults() + { + OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser( + OpenCensusProtobufInputRowParserTest.PARSE_SPEC, + null, null, null + ); + + Assert.assertEquals("name", parser.getMetricDimension()); + Assert.assertEquals("", parser.getMetricLabelPrefix()); + Assert.assertEquals("", parser.getResourceLabelPrefix()); + } + @Test public void testDoubleGaugeParse() { //configure parser with desc file - OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, ""); + OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(PARSE_SPEC, null, null, ""); Metric metric = doubleGaugeMetric(TIMESTAMP); @@ -124,7 +152,7 @@ public void testDoubleGaugeParse() public void testIntGaugeParse() { //configure parser with desc file - OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, ""); + OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(PARSE_SPEC, null, null, ""); Metric metric = intGaugeMetric(TIMESTAMP); @@ -141,7 +169,7 @@ public void testIntGaugeParse() public void testSummaryParse() { //configure parser with desc file - OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, ""); + OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(PARSE_SPEC, null, null, ""); Metric metric = summaryMetric(TIMESTAMP); @@ -166,7 +194,7 @@ public void testSummaryParse() public void testDistributionParse() { //configure parser with desc file - OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, ""); + OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(PARSE_SPEC, null, null, ""); Metric metric = distributionMetric(TIMESTAMP); @@ -191,7 +219,7 @@ public void testDistributionParse() public void testDimensionsParseWithParseSpecDimensions() { //configure parser with desc file - OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpecWithDimensions, null, null, ""); + OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(PARSE_SPEC_WITH_DIMENSIONS, null, null, ""); Metric metric = summaryMetric(TIMESTAMP); @@ -212,10 +240,10 @@ public void testDimensionsParseWithParseSpecDimensions() } @Test - public void testDimensionsParseWithoutParseSpecDimensions() + public void testDimensionsParseWithoutPARSE_SPECDimensions() { //configure parser with desc file - OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, ""); + OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(PARSE_SPEC, null, null, ""); Metric metric = summaryMetric(TIMESTAMP); @@ -241,7 +269,7 @@ public void testDimensionsParseWithoutParseSpecDimensions() public void testMetricNameOverride() { //configure parser with desc file - OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, "dimension_name", null, ""); + OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(PARSE_SPEC, "dimension_name", null, ""); Metric metric = summaryMetric(Timestamp.getDefaultInstance()); @@ -266,7 +294,7 @@ public void testMetricNameOverride() public void testDefaultPrefix() { //configure parser with desc file - OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, null, null); + OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(PARSE_SPEC, null, null, null); Metric metric = summaryMetric(Timestamp.getDefaultInstance()); @@ -291,7 +319,7 @@ public void testDefaultPrefix() public void testCustomPrefix() { //configure parser with desc file - OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, null, "descriptor.", "custom."); + OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(PARSE_SPEC, null, "descriptor.", "custom."); Metric metric = summaryMetric(Timestamp.getDefaultInstance()); @@ -312,20 +340,6 @@ public void testCustomPrefix() assertDimensionEquals(row, "custom.env_key", "env_val"); } - @Test - public void testSerde() throws Exception - { - OpenCensusProtobufInputRowParser parser = new OpenCensusProtobufInputRowParser(parseSpec, "metric.name", "descriptor.", "custom."); - - final ObjectMapper jsonMapper = new ObjectMapper(); - jsonMapper.registerModules(new OpenCensusProtobufExtensionsModule().getJacksonModules()); - - Assert.assertEquals(parser, jsonMapper.readValue( - jsonMapper.writeValueAsString(parser), - ByteBufferInputRowParser.class - )); - } - private void assertDimensionEquals(InputRow row, String dimension, Object expected) { List values = row.getDimension(dimension); @@ -334,7 +348,7 @@ private void assertDimensionEquals(InputRow row, String dimension, Object expect Assert.assertEquals(expected, values.get(0)); } - private Metric doubleGaugeMetric(Timestamp timestamp) + static Metric doubleGaugeMetric(Timestamp timestamp) { return getMetric( "metric_gauge_double", @@ -347,7 +361,7 @@ private Metric doubleGaugeMetric(Timestamp timestamp) timestamp); } - private Metric intGaugeMetric(Timestamp timestamp) + static Metric intGaugeMetric(Timestamp timestamp) { return getMetric( "metric_gauge_int64", @@ -360,7 +374,7 @@ private Metric intGaugeMetric(Timestamp timestamp) timestamp); } - private Metric summaryMetric(Timestamp timestamp) + static Metric summaryMetric(Timestamp timestamp) { SummaryValue.Snapshot snapshot = SummaryValue.Snapshot.newBuilder() @@ -408,7 +422,7 @@ private Metric summaryMetric(Timestamp timestamp) timestamp); } - private Metric distributionMetric(Timestamp timestamp) + static Metric distributionMetric(Timestamp timestamp) { DistributionValue distributionValue = DistributionValue.newBuilder() .setCount(100) @@ -426,7 +440,7 @@ private Metric distributionMetric(Timestamp timestamp) timestamp); } - private Metric getMetric(String name, String description, MetricDescriptor.Type type, Point point, Timestamp timestamp) + static Metric getMetric(String name, String description, MetricDescriptor.Type type, Point point, Timestamp timestamp) { Metric dist = Metric.newBuilder() .setMetricDescriptor(