Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exporter-otlp: adding metrics exporter structure #2323

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Decode URL-encoded headers in environment variables
([#2312](https://github.com/open-telemetry/opentelemetry-python/pull/2312))
- [exporter/opentelemetry-exporter-otlp-proto-grpc] Add OTLPMetricExporter
([#2323](https://github.com/open-telemetry/opentelemetry-python/pull/2323))

## [1.8.0-0.27b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.8.0-0.27b0) - 2021-12-17

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.


class MetricExporter:
pass
# metrics.py
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
# TODO
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright The OpenTelemetry Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, Sequence
from grpc import ChannelCredentials, Compression
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
OTLPExporterMixin,
get_resource_data,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceRequest,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import (
MetricsServiceStub,
)
from opentelemetry.proto.common.v1.common_pb2 import InstrumentationLibrary
from opentelemetry.proto.metrics.v1.metrics_pb2 import (
InstrumentationLibraryMetrics,
ResourceMetrics,
)
from opentelemetry.proto.metrics.v1.metrics_pb2 import Metric as PB2Metric
from opentelemetry.sdk._metrics.data import (
MetricData,
)

from opentelemetry.sdk._metrics.export import (
MetricExporter,
MetricExportResult,
)


class OTLPMetricExporter(
MetricExporter,
OTLPExporterMixin[MetricData, ExportMetricsServiceRequest, MetricExportResult]
codeboten marked this conversation as resolved.
Show resolved Hide resolved
):
_result = MetricExportResult
_stub = MetricsServiceStub

def __init__(
self,
endpoint: Optional[str] = None,
insecure: Optional[bool] = None,
credentials: Optional[ChannelCredentials] = None,
headers: Optional[Sequence] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
):
super().__init__(
**{
"endpoint": endpoint,
"insecure": insecure,
"credentials": credentials,
"headers": headers,
"timeout": timeout,
"compression": compression,
}
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
)

def _translate_data(
self, data: Sequence[MetricData]
) -> ExportMetricsServiceRequest:
sdk_resource_instrumentation_library_metrics = {}
self._collector_metric_kwargs = {}

for metric_data in data:
resource = metric_data.metric.resource
instrumentation_library_map = (
sdk_resource_instrumentation_library_metrics.get(resource, {})
)
if not instrumentation_library_map:
sdk_resource_instrumentation_library_metrics[
resource
] = instrumentation_library_map

instrumentation_library_metrics = instrumentation_library_map.get(
metric_data.instrumentation_info
)

if not instrumentation_library_metrics:
if metric_data.instrumentation_info is not None:
instrumentation_library_map[
metric_data.instrumentation_info
] = InstrumentationLibraryMetrics(
instrumentation_library=InstrumentationLibrary(
name=metric_data.instrumentation_info.name,
version=metric_data.instrumentation_info.version,
)
)
else:
instrumentation_library_map[
metric_data.instrumentation_info
] = InstrumentationLibraryMetrics()
lzchen marked this conversation as resolved.
Show resolved Hide resolved

instrumentation_library_metrics = instrumentation_library_map.get(
metric_data.instrumentation_info
)

instrumentation_library_metrics.metrics.append(
PB2Metric(**self._collector_metric_kwargs)
)
return ExportMetricsServiceRequest(
resource_metrics=get_resource_data(
sdk_resource_instrumentation_library_metrics,
ResourceMetrics,
"metrics",
)
)

def export(self, metrics: Sequence[MetricData]) -> MetricExportResult:
return self._export(metrics)

def shutdown(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def setUp(self):

self.server = server(ThreadPoolExecutor(max_workers=10))

self.server.add_insecure_port("[::]:4317")
self.server.add_insecure_port("127.0.0.1:4317")

self.server.start()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# Copyright The OpenTelemetry Authors
codeboten marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from concurrent.futures import ThreadPoolExecutor
from unittest import TestCase
from unittest.mock import patch

from google.protobuf.duration_pb2 import Duration
from google.rpc.error_details_pb2 import RetryInfo
from grpc import StatusCode, server

from opentelemetry.exporter.otlp.proto.grpc._metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceResponse,
)
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2_grpc import (
MetricsServiceServicer,
add_MetricsServiceServicer_to_server,
)
from opentelemetry.sdk._metrics.data import Metric, MetricData
from opentelemetry.sdk._metrics.export import MetricExportResult
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo


class MetricsServiceServicerUNAVAILABLEDelay(MetricsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.UNAVAILABLE)

context.send_initial_metadata(
(("google.rpc.retryinfo-bin", RetryInfo().SerializeToString()),)
)
context.set_trailing_metadata(
(
(
"google.rpc.retryinfo-bin",
RetryInfo(
retry_delay=Duration(seconds=4)
).SerializeToString(),
),
)
)

return ExportMetricsServiceResponse()


class MetricsServiceServicerUNAVAILABLE(MetricsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.UNAVAILABLE)

return ExportMetricsServiceResponse()


class MetricsServiceServicerSUCCESS(MetricsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.OK)

return ExportMetricsServiceResponse()


class MetricsServiceServicerALREADY_EXISTS(MetricsServiceServicer):
# pylint: disable=invalid-name,unused-argument,no-self-use
def Export(self, request, context):
context.set_code(StatusCode.ALREADY_EXISTS)

return ExportMetricsServiceResponse()


class TestOTLPMetricExporter(TestCase):
def setUp(self):

self.exporter = OTLPMetricExporter()

self.server = server(ThreadPoolExecutor(max_workers=10))

self.server.add_insecure_port("127.0.0.1:4317")

self.server.start()

self.metric_data_1 = MetricData(
metric=Metric(
resource=SDKResource({"key": "value"}),
),
instrumentation_info=InstrumentationInfo(
"first_name", "first_version"
),
)

def tearDown(self):
self.server.stop(None)

@patch(
"opentelemetry.exporter.otlp.proto.grpc.exporter.ssl_channel_credentials"
)
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel")
@patch(
"opentelemetry.exporter.otlp.proto.grpc._metric_exporter.OTLPMetricExporter._stub"
)
# pylint: disable=unused-argument
def test_no_credentials_error(
self, mock_ssl_channel, mock_secure, mock_stub
):
OTLPMetricExporter(insecure=False)
self.assertTrue(mock_ssl_channel.called)

# pylint: disable=no-self-use
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.secure_channel")
def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
expected_endpoint = "localhost:4317"
endpoints = [
(
"http://localhost:4317",
None,
mock_insecure,
),
(
"localhost:4317",
None,
mock_insecure,
),
(
"localhost:4317",
False,
mock_secure,
),
(
"https://localhost:4317",
None,
mock_secure,
),
(
"https://localhost:4317",
True,
mock_insecure,
),
]
# pylint: disable=C0209
for endpoint, insecure, mock_method in endpoints:
OTLPMetricExporter(endpoint=endpoint, insecure=insecure)
self.assertEqual(
1,
mock_method.call_count,
"expected {} to be called for {} {}".format(
mock_method, endpoint, insecure
),
)
self.assertEqual(
expected_endpoint,
mock_method.call_args[0][0],
"expected {} got {} {}".format(
expected_endpoint, mock_method.call_args[0][0], endpoint
),
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

mock_expo.configure_mock(**{"return_value": [1]})

add_MetricsServiceServicer_to_server(
MetricsServiceServicerUNAVAILABLE(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
MetricExportResult.FAILURE,
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

mock_expo.configure_mock(**{"return_value": [1]})

add_MetricsServiceServicer_to_server(
MetricsServiceServicerUNAVAILABLEDelay(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
MetricExportResult.FAILURE,
)
mock_sleep.assert_called_with(4)

def test_success(self):
add_MetricsServiceServicer_to_server(
MetricsServiceServicerSUCCESS(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
MetricExportResult.SUCCESS,
)

def test_failure(self):
add_MetricsServiceServicer_to_server(
MetricsServiceServicerALREADY_EXISTS(), self.server
)
self.assertEqual(
self.exporter.export([self.metric_data_1]),
MetricExportResult.FAILURE,
)
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def setUp(self):

self.server = server(ThreadPoolExecutor(max_workers=10))

self.server.add_insecure_port("[::]:4317")
self.server.add_insecure_port("127.0.0.1:4317")

self.server.start()

Expand Down
Loading