Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OC Exporter - send start_timestamp, resource labels, and convert labels to strings #937

Merged
merged 9 commits into from
Aug 13, 2020
2 changes: 2 additions & 0 deletions exporter/opentelemetry-exporter-opencensus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Send start_timestamp and convert labels to strings
([#937](https://github.com/open-telemetry/opentelemetry-python/pull/937))
- Change package name to opentelemetry-exporter-opencensus
([#953](https://github.com/open-telemetry/opentelemetry-python/pull/953))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
"""OpenCensus Collector Metrics Exporter."""

import logging
from typing import Sequence
from typing import Dict, Sequence

import grpc
from google.protobuf.timestamp_pb2 import Timestamp
from opencensus.proto.agent.metrics.v1 import (
metrics_service_pb2,
metrics_service_pb2_grpc,
)
from opencensus.proto.metrics.v1 import metrics_pb2
from opencensus.proto.resource.v1 import resource_pb2

import opentelemetry.exporter.opencensus.util as utils
from opentelemetry.sdk.metrics import Counter, Metric
Expand All @@ -34,6 +36,14 @@

DEFAULT_ENDPOINT = "localhost:55678"

# In priority order. See collector impl https://bit.ly/2DvJW6y
OT_LABEL_PRESENCE_TO_RESOURCE_TYPE = [
aabmass marked this conversation as resolved.
Show resolved Hide resolved
("container.name", "container"),
("k8s.pod.name", "k8s"),
("host.name", "host"),
("cloud.provider", "cloud"),
]

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -65,6 +75,8 @@ def __init__(
self.client = client

self.node = utils.get_node(service_name, host_name)
self.exporter_start_timestamp = Timestamp()
self.exporter_start_timestamp.GetCurrentTime()

def export(
self, metric_records: Sequence[MetricRecord]
Expand All @@ -89,7 +101,9 @@ def shutdown(self) -> None:
def generate_metrics_requests(
self, metrics: Sequence[MetricRecord]
) -> metrics_service_pb2.ExportMetricsServiceRequest:
collector_metrics = translate_to_collector(metrics)
collector_metrics = translate_to_collector(
metrics, self.exporter_start_timestamp
)
service_request = metrics_service_pb2.ExportMetricsServiceRequest(
node=self.node, metrics=collector_metrics
)
Expand All @@ -99,6 +113,7 @@ def generate_metrics_requests(
# pylint: disable=too-many-branches
def translate_to_collector(
metric_records: Sequence[MetricRecord],
exporter_start_timestamp: Timestamp,
) -> Sequence[metrics_pb2.Metric]:
collector_metrics = []
for metric_record in metric_records:
Expand All @@ -109,7 +124,8 @@ def translate_to_collector(
label_keys.append(metrics_pb2.LabelKey(key=label_tuple[0]))
label_values.append(
metrics_pb2.LabelValue(
has_value=label_tuple[1] is not None, value=label_tuple[1]
has_value=label_tuple[1] is not None,
value=str(label_tuple[1]),
)
)

Expand All @@ -121,13 +137,23 @@ def translate_to_collector(
label_keys=label_keys,
)

# If cumulative and stateful, explicitly set the start_timestamp to
# exporter start time.
if metric_record.instrument.meter.batcher.stateful:
aabmass marked this conversation as resolved.
Show resolved Hide resolved
start_timestamp = exporter_start_timestamp
else:
start_timestamp = None

timeseries = metrics_pb2.TimeSeries(
label_values=label_values,
points=[get_collector_point(metric_record)],
start_timestamp=start_timestamp,
)
collector_metrics.append(
metrics_pb2.Metric(
metric_descriptor=metric_descriptor, timeseries=[timeseries]
metric_descriptor=metric_descriptor,
timeseries=[timeseries],
resource=get_resource(metric_record),
)
)
return collector_metrics
Expand Down Expand Up @@ -162,3 +188,22 @@ def get_collector_point(metric_record: MetricRecord) -> metrics_pb2.Point:
)
)
return point


def get_resource(metric_record: MetricRecord) -> resource_pb2.Resource:
resource_labels = metric_record.instrument.meter.resource.labels
return resource_pb2.Resource(
type=infer_oc_resource_type(resource_labels),
labels={k: str(v) for k, v in resource_labels.items()},
)


def infer_oc_resource_type(resource_labels: Dict[str, str]) -> str:
"""Convert from OT resource labels to OC resource type"""
for (
ot_resource_key,
oc_resource_type,
) in OT_LABEL_PRESENCE_TO_RESOURCE_TYPE:
if ot_resource_key in resource_labels:
return oc_resource_type
return ""
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,24 @@
MetricsExportResult,
aggregate,
)
from opentelemetry.sdk.resources import Resource


# pylint: disable=no-member
class TestCollectorMetricsExporter(unittest.TestCase):
@classmethod
def setUpClass(cls):
# pylint: disable=protected-access
metrics.set_meter_provider(MeterProvider())
cls._resource_labels = {
"key_with_str_value": "some string",
"key_with_int_val": 321,
"key_with_true": True,
}
metrics.set_meter_provider(
MeterProvider(resource=Resource(cls._resource_labels))
)
cls._meter = metrics.get_meter(__name__)
cls._labels = {"environment": "staging"}
cls._labels = {"environment": "staging", "number": 321}
cls._key_labels = get_dict_as_key(cls._labels)

def test_constructor(self):
Expand Down Expand Up @@ -119,7 +127,7 @@ def test_export(self):
client=mock_client, host_name=host_name
)
test_metric = self._meter.create_metric(
"testname", "testdesc", "unit", int, Counter,
"testname", "testdesc", "unit", int, Counter, self._labels.keys(),
)
record = MetricRecord(
test_metric, self._key_labels, aggregate.SumAggregator(),
Expand All @@ -142,13 +150,16 @@ def test_export(self):

def test_translate_to_collector(self):
test_metric = self._meter.create_metric(
"testname", "testdesc", "unit", int, Counter,
"testname", "testdesc", "unit", int, Counter, self._labels.keys()
)
aggregator = aggregate.SumAggregator()
aggregator.update(123)
aggregator.take_checkpoint()
record = MetricRecord(test_metric, self._key_labels, aggregator,)
output_metrics = metrics_exporter.translate_to_collector([record])
start_timestamp = Timestamp()
output_metrics = metrics_exporter.translate_to_collector(
[record], start_timestamp,
)
self.assertEqual(len(output_metrics), 1)
self.assertIsInstance(output_metrics[0], metrics_pb2.Metric)
self.assertEqual(output_metrics[0].metric_descriptor.name, "testname")
Expand All @@ -161,14 +172,44 @@ def test_translate_to_collector(self):
metrics_pb2.MetricDescriptor.CUMULATIVE_INT64,
)
self.assertEqual(
len(output_metrics[0].metric_descriptor.label_keys), 1
len(output_metrics[0].metric_descriptor.label_keys), 2
)
self.assertEqual(
output_metrics[0].metric_descriptor.label_keys[0].key,
"environment",
)
self.assertEqual(
output_metrics[0].metric_descriptor.label_keys[1].key, "number",
)

self.assertIsNotNone(output_metrics[0].resource)
self.assertEqual(
output_metrics[0].resource.type, "",
)
self.assertEqual(
output_metrics[0].resource.labels["key_with_str_value"],
self._resource_labels["key_with_str_value"],
)
self.assertIsInstance(
output_metrics[0].resource.labels["key_with_int_val"], str,
)
self.assertEqual(
output_metrics[0].resource.labels["key_with_int_val"],
str(self._resource_labels["key_with_int_val"]),
)
self.assertIsInstance(
output_metrics[0].resource.labels["key_with_true"], str,
)
self.assertEqual(
output_metrics[0].resource.labels["key_with_true"],
str(self._resource_labels["key_with_true"]),
)

self.assertEqual(len(output_metrics[0].timeseries), 1)
self.assertEqual(len(output_metrics[0].timeseries[0].label_values), 1)
self.assertEqual(len(output_metrics[0].timeseries[0].label_values), 2)
self.assertEqual(
output_metrics[0].timeseries[0].start_timestamp, start_timestamp
)
self.assertEqual(
output_metrics[0].timeseries[0].label_values[0].has_value, True
)
Expand All @@ -187,3 +228,59 @@ def test_translate_to_collector(self):
self.assertEqual(
output_metrics[0].timeseries[0].points[0].int64_value, 123
)

def test_infer_ot_resource_type(self):
# empty resource
self.assertEqual(metrics_exporter.infer_oc_resource_type({}), "")

# container
self.assertEqual(
metrics_exporter.infer_oc_resource_type(
{
"k8s.cluster.name": "cluster1",
"k8s.pod.name": "pod1",
"k8s.namespace.name": "namespace1",
"container.name": "container-name1",
"cloud.account.id": "proj1",
"cloud.zone": "zone1",
}
),
"container",
)

# k8s pod
self.assertEqual(
metrics_exporter.infer_oc_resource_type(
{
"k8s.cluster.name": "cluster1",
"k8s.pod.name": "pod1",
"k8s.namespace.name": "namespace1",
"cloud.zone": "zone1",
}
),
"k8s",
)

# host
self.assertEqual(
metrics_exporter.infer_oc_resource_type(
{
"k8s.cluster.name": "cluster1",
"cloud.zone": "zone1",
"host.name": "node1",
}
),
"host",
)

# cloud
self.assertEqual(
metrics_exporter.infer_oc_resource_type(
{
"cloud.provider": "gcp",
"host.id": "inst1",
"cloud.zone": "zone1",
}
),
"cloud",
)