Skip to content

Commit

Permalink
Rename MetricRecord to ExportRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
shovnik committed Nov 12, 2020
1 parent 26bf23f commit baf3b6e
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import opentelemetry.exporter.opencensus.util as utils
from opentelemetry.sdk.metrics import Counter, Metric
from opentelemetry.sdk.metrics.export import (
MetricRecord,
ExportRecord,
MetricsExporter,
MetricsExportResult,
)
Expand Down Expand Up @@ -79,11 +79,11 @@ def __init__(
self.exporter_start_timestamp.GetCurrentTime()

def export(
self, metric_records: Sequence[MetricRecord]
) -> MetricsExportResult:
self, export_records: Sequence[ExportRecord]
) -> MetricsExportResult:
try:
responses = self.client.Export(
self.generate_metrics_requests(metric_records)
self.generate_metrics_requests(export_records)
)

# Read response
Expand All @@ -99,7 +99,7 @@ def shutdown(self) -> None:
pass

def generate_metrics_requests(
self, metrics: Sequence[MetricRecord]
self, metrics: Sequence[ExportRecord]
) -> metrics_service_pb2.ExportMetricsServiceRequest:
collector_metrics = translate_to_collector(
metrics, self.exporter_start_timestamp
Expand All @@ -112,15 +112,15 @@ def generate_metrics_requests(

# pylint: disable=too-many-branches
def translate_to_collector(
metric_records: Sequence[MetricRecord],
export_records: Sequence[ExportRecord],
exporter_start_timestamp: Timestamp,
) -> Sequence[metrics_pb2.Metric]:
collector_metrics = []
for metric_record in metric_records:
for export_record in export_records:

label_values = []
label_keys = []
for label_tuple in metric_record.labels:
for label_tuple in export_record.labels:
label_keys.append(metrics_pb2.LabelKey(key=label_tuple[0]))
label_values.append(
metrics_pb2.LabelValue(
Expand All @@ -130,30 +130,30 @@ def translate_to_collector(
)

metric_descriptor = metrics_pb2.MetricDescriptor(
name=metric_record.instrument.name,
description=metric_record.instrument.description,
unit=metric_record.instrument.unit,
type=get_collector_metric_type(metric_record.instrument),
name=export_record.instrument.name,
description=export_record.instrument.description,
unit=export_record.instrument.unit,
type=get_collector_metric_type(export_record.instrument),
label_keys=label_keys,
)

# If cumulative and stateful, explicitly set the start_timestamp to
# exporter start time.
if metric_record.instrument.meter.processor.stateful:
if export_record.instrument.meter.processor.stateful:
start_timestamp = exporter_start_timestamp
else:
start_timestamp = None

timeseries = metrics_pb2.TimeSeries(
label_values=label_values,
points=[get_collector_point(metric_record)],
points=[get_collector_point(export_record)],
start_timestamp=start_timestamp,
)
collector_metrics.append(
metrics_pb2.Metric(
metric_descriptor=metric_descriptor,
timeseries=[timeseries],
resource=get_resource(metric_record),
resource=get_resource(export_record),
)
)
return collector_metrics
Expand All @@ -169,29 +169,29 @@ def get_collector_metric_type(metric: Metric) -> metrics_pb2.MetricDescriptor:
return metrics_pb2.MetricDescriptor.UNSPECIFIED


def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point:
def get_collector_point(export_record: ExportRecord) -> metrics_pb2.Point:
# TODO: horrible hack to get original list of keys to then get the bound
# instrument
point = metrics_pb2.Point(
timestamp=utils.proto_timestamp_from_time_ns(
metric_record.aggregator.last_update_timestamp
export_record.aggregator.last_update_timestamp
)
)
if metric_record.instrument.value_type == int:
point.int64_value = metric_record.aggregator.checkpoint
elif metric_record.instrument.value_type == float:
point.double_value = metric_record.aggregator.checkpoint
if export_record.instrument.value_type == int:
point.int64_value = export_record.aggregator.checkpoint
elif export_record.instrument.value_type == float:
point.double_value = export_record.aggregator.checkpoint
else:
raise TypeError(
"Unsupported metric type: {}".format(
metric_record.instrument.value_type
export_record.instrument.value_type
)
)
return point


def get_resource(metric_record: MetricRecord) -> resource_pb2.Resource:
resource_attributes = metric_record.resource.attributes
def get_resource(export_record: ExportRecord) -> resource_pb2.Resource:
resource_attributes = export_record.resource.attributes
return resource_pb2.Resource(
type=infer_oc_resource_type(resource_attributes),
labels={k: str(v) for k, v in resource_attributes.items()},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
get_dict_as_key,
)
from opentelemetry.sdk.metrics.export import (
MetricRecord,
ExportRecord,
MetricsExportResult,
aggregate,
)
Expand Down Expand Up @@ -100,7 +100,7 @@ def test_get_collector_point(self):
"testName", "testDescription", "unit", float,
)
result = metrics_exporter.get_collector_point(
MetricRecord(
ExportRecord(
int_counter,
self._key_labels,
aggregator,
Expand All @@ -113,7 +113,7 @@ def test_get_collector_point(self):
aggregator.update(123.5)
aggregator.take_checkpoint()
result = metrics_exporter.get_collector_point(
MetricRecord(
ExportRecord(
float_counter,
self._key_labels,
aggregator,
Expand All @@ -124,7 +124,7 @@ def test_get_collector_point(self):
self.assertRaises(
TypeError,
metrics_exporter.get_collector_point(
MetricRecord(
ExportRecord(
valuerecorder,
self._key_labels,
aggregator,
Expand All @@ -144,7 +144,7 @@ def test_export(self):
test_metric = self._meter.create_counter(
"testname", "testdesc", "unit", int, self._labels.keys(),
)
record = MetricRecord(
record = ExportRecord(
test_metric,
self._key_labels,
aggregate.SumAggregator(),
Expand Down Expand Up @@ -173,7 +173,7 @@ def test_translate_to_collector(self):
aggregator = aggregate.SumAggregator()
aggregator.update(123)
aggregator.take_checkpoint()
record = MetricRecord(
record = ExportRecord(
test_metric,
self._key_labels,
aggregator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
ValueRecorder,
)
from opentelemetry.sdk.metrics.export import (
MetricRecord,
ExportRecord,
MetricsExporter,
MetricsExportResult,
)
Expand All @@ -71,47 +71,45 @@


def _get_data_points(
sdk_metric_record: MetricRecord, data_point_class: Type[DataPointT]
export_record: ExportRecord, data_point_class: Type[DataPointT]
) -> List[DataPointT]:

if isinstance(sdk_metric_record.aggregator, SumAggregator):
value = sdk_metric_record.aggregator.checkpoint
if isinstance(export_record.aggregator, SumAggregator):
value = export_record.aggregator.checkpoint

elif isinstance(sdk_metric_record.aggregator, MinMaxSumCountAggregator):
elif isinstance(export_record.aggregator, MinMaxSumCountAggregator):
# FIXME: How are values to be interpreted from this aggregator?
raise Exception("MinMaxSumCount aggregator data not supported")

elif isinstance(sdk_metric_record.aggregator, HistogramAggregator):
elif isinstance(export_record.aggregator, HistogramAggregator):
# FIXME: How are values to be interpreted from this aggregator?
raise Exception("Histogram aggregator data not supported")

elif isinstance(sdk_metric_record.aggregator, LastValueAggregator):
value = sdk_metric_record.aggregator.checkpoint
elif isinstance(export_record.aggregator, LastValueAggregator):
value = export_record.aggregator.checkpoint

elif isinstance(sdk_metric_record.aggregator, ValueObserverAggregator):
value = sdk_metric_record.aggregator.checkpoint.last
elif isinstance(export_record.aggregator, ValueObserverAggregator):
value = export_record.aggregator.checkpoint.last

return [
data_point_class(
labels=[
StringKeyValue(key=str(label_key), value=str(label_value))
for label_key, label_value in sdk_metric_record.labels
for label_key, label_value in export_record.labels
],
value=value,
start_time_unix_nano=(
sdk_metric_record.aggregator.initial_checkpoint_timestamp
),
time_unix_nano=(
sdk_metric_record.aggregator.last_update_timestamp
export_record.aggregator.initial_checkpoint_timestamp
),
time_unix_nano=(export_record.aggregator.last_update_timestamp),
)
]


class OTLPMetricsExporter(
MetricsExporter,
OTLPExporterMixin[
MetricRecord, ExportMetricsServiceRequest, MetricsExportResult
ExportRecord, ExportMetricsServiceRequest, MetricsExportResult
],
):
# pylint: disable=unsubscriptable-object
Expand Down Expand Up @@ -162,14 +160,14 @@ def __init__(

# pylint: disable=no-self-use
def _translate_data(
self, data: Sequence[MetricRecord]
self, export_records: Sequence[ExportRecord]
) -> ExportMetricsServiceRequest:
# pylint: disable=too-many-locals,no-member
# pylint: disable=attribute-defined-outside-init

sdk_resource_instrumentation_library_metrics = {}

# The criteria to decide how to translate data is based on this table
# The criteria to decide how to translate export_records is based on this table
# taken directly from OpenTelemetry Proto v0.5.0:

# TODO: Update table after the decision on:
Expand All @@ -185,13 +183,13 @@ def _translate_data(
# SumObserver Sum(aggregation_temporality=cumulative;is_monotonic=true)
# UpDownSumObserver Sum(aggregation_temporality=cumulative;is_monotonic=false)
# ValueObserver Gauge()
for sdk_metric_record in data:
for export_record in export_records:

if sdk_metric_record.resource not in (
if export_record.resource not in (
sdk_resource_instrumentation_library_metrics.keys()
):
sdk_resource_instrumentation_library_metrics[
sdk_metric_record.resource
export_record.resource
] = InstrumentationLibraryMetrics()

type_class = {
Expand All @@ -210,16 +208,16 @@ def _translate_data(
},
}

value_type = sdk_metric_record.instrument.value_type
value_type = export_record.instrument.value_type

sum_class = type_class[value_type]["sum"]["class"]
gauge_class = type_class[value_type]["gauge"]["class"]
data_point_class = type_class[value_type]["data_point_class"]

if isinstance(sdk_metric_record.instrument, Counter):
if isinstance(export_record.instrument, Counter):
otlp_metric_data = sum_class(
data_points=_get_data_points(
sdk_metric_record, data_point_class
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
Expand All @@ -228,10 +226,10 @@ def _translate_data(
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(sdk_metric_record.instrument, UpDownCounter):
elif isinstance(export_record.instrument, UpDownCounter):
otlp_metric_data = sum_class(
data_points=_get_data_points(
sdk_metric_record, data_point_class
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA
Expand All @@ -240,14 +238,14 @@ def _translate_data(
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(sdk_metric_record.instrument, (ValueRecorder)):
elif isinstance(export_record.instrument, (ValueRecorder)):
logger.warning("Skipping exporting of ValueRecorder metric")
continue

elif isinstance(sdk_metric_record.instrument, SumObserver):
elif isinstance(export_record.instrument, SumObserver):
otlp_metric_data = sum_class(
data_points=_get_data_points(
sdk_metric_record, data_point_class
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
Expand All @@ -256,10 +254,10 @@ def _translate_data(
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(sdk_metric_record.instrument, UpDownSumObserver):
elif isinstance(export_record.instrument, UpDownSumObserver):
otlp_metric_data = sum_class(
data_points=_get_data_points(
sdk_metric_record, data_point_class
export_record, data_point_class
),
aggregation_temporality=(
AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE
Expand All @@ -268,24 +266,22 @@ def _translate_data(
)
argument = type_class[value_type]["sum"]["argument"]

elif isinstance(sdk_metric_record.instrument, (ValueObserver)):
elif isinstance(export_record.instrument, (ValueObserver)):
otlp_metric_data = gauge_class(
data_points=_get_data_points(
sdk_metric_record, data_point_class
export_record, data_point_class
)
)
argument = type_class[value_type]["gauge"]["argument"]

sdk_resource_instrumentation_library_metrics[
sdk_metric_record.resource
export_record.resource
].metrics.append(
OTLPMetric(
**{
"name": sdk_metric_record.instrument.name,
"description": (
sdk_metric_record.instrument.description
),
"unit": sdk_metric_record.instrument.unit,
"name": export_record.instrument.name,
"description": (export_record.instrument.description),
"unit": export_record.instrument.unit,
argument: otlp_metric_data,
}
)
Expand All @@ -299,6 +295,6 @@ def _translate_data(
)
)

def export(self, metrics: Sequence[MetricRecord]) -> MetricsExportResult:
def export(self, metrics: Sequence[ExportRecord]) -> MetricsExportResult:
# pylint: disable=arguments-differ
return self._export(metrics)
Loading

0 comments on commit baf3b6e

Please sign in to comment.