Skip to content

Commit

Permalink
Bug fix: detect and adapt to backoff package version (#2980)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickstenning committed Oct 31, 2022
1 parent 11f49d5 commit 370af5f
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 19 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2976](https://github.com/open-telemetry/opentelemetry-python/pull/2976))
- [exporter/opentelemetry-exporter-otlp-proto-http] Add OTLPMetricExporter
([#2891](https://github.com/open-telemetry/opentelemetry-python/pull/2891))
- Fix a bug with exporter retries for with newer versions of the backoff library
([#2980](https://github.com/open-telemetry/opentelemetry-python/pull/2980))

## [1.13.0-0.34b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.13.0) - 2022-09-26

Expand Down Expand Up @@ -90,7 +92,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2726](https://github.com/open-telemetry/opentelemetry-python/pull/2726))
- fix: frozenset object has no attribute items
([#2727](https://github.com/open-telemetry/opentelemetry-python/pull/2727))
- fix: create suppress HTTP instrumentation key in opentelemetry context
- fix: create suppress HTTP instrumentation key in opentelemetry context
([#2729](https://github.com/open-telemetry/opentelemetry-python/pull/2729))
- Support logs SDK auto instrumentation enable/disable with env
([#2728](https://github.com/open-telemetry/opentelemetry-python/pull/2728))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from urllib.parse import urlparse
from opentelemetry.sdk.trace import ReadableSpan

from backoff import expo
import backoff
from google.rpc.error_details_pb2 import RetryInfo
from grpc import (
ChannelCredentials,
Expand Down Expand Up @@ -183,6 +183,19 @@ def _get_credentials(creds, environ_key):
return ssl_channel_credentials()


# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


# pylint: disable=no-member
class OTLPExporterMixin(
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT]
Expand Down Expand Up @@ -296,7 +309,7 @@ def _export(
# expo returns a generator that yields delay values which grow
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in expo(max_value=max_value):
for delay in _expo(max_value=max_value):

if delay == max_value:
return self._result.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -265,7 +265,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure):
)
mock_method.reset_mock()

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -464,7 +464,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_environ_to_compression(self):
with self.assertRaises(InvalidCompressionValueException):
environ_to_compression("test_invalid")

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
def test_export_warning(self, mock_expo):

mock_expo.configure_mock(**{"return_value": [0]})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,23 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure):
# pylint: disable=protected-access
self.assertIsNone(exporter._headers, None)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

add_TraceServiceServicer_to_server(
TraceServiceServicerUNAVAILABLE(), self.server
)
self.exporter.export([self.span])
mock_sleep.assert_called_once_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable(self, mock_sleep, mock_expo):

Expand All @@ -450,7 +466,7 @@ def test_unavailable(self, mock_sleep, mock_expo):
)
mock_sleep.assert_called_with(1)

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
@patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep")
def test_unavailable_delay(self, mock_sleep, mock_expo):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ dependencies = [
]

[project.optional-dependencies]
test = []
test = [
"responses == 0.22.0",
]

[project.entry-points.opentelemetry_traces_exporter]
otlp_proto_http = "opentelemetry.exporter.otlp.proto.http.trace_exporter:OTLPSpanExporter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from typing import Dict, Optional, Sequence
from time import sleep

import backoff
import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
Expand Down Expand Up @@ -53,6 +53,18 @@
DEFAULT_LOGS_EXPORT_PATH = "v1/logs"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPLogExporter(LogExporter):

Expand Down Expand Up @@ -122,7 +134,7 @@ def export(self, batch: Sequence[LogData]) -> LogExportResult:

serialized_data = _ProtobufEncoder.serialize(batch)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return LogExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.util.re import parse_headers

import backoff
import requests
from backoff import expo

_logger = logging.getLogger(__name__)

Expand All @@ -77,6 +77,18 @@
DEFAULT_METRICS_EXPORT_PATH = "v1/metrics"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPMetricExporter(MetricExporter):

Expand Down Expand Up @@ -319,7 +331,7 @@ def export(
**kwargs,
) -> MetricExportResult:
serialized_data = self._translate_data(metrics_data)
for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return MetricExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from typing import Dict, Optional
from time import sleep

import backoff
import requests
from backoff import expo

from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE,
Expand Down Expand Up @@ -54,6 +54,18 @@
DEFAULT_TRACES_EXPORT_PATH = "v1/traces"
DEFAULT_TIMEOUT = 10 # in seconds

# Work around API change between backoff 1.x and 2.x. Since 2.0.0 the backoff
# wait generator API requires a first .send(None) before reading the backoff
# values from the generator.
_is_backoff_v2 = next(backoff.expo()) is None


def _expo(*args, **kwargs):
gen = backoff.expo(*args, **kwargs)
if _is_backoff_v2:
gen.send(None)
return gen


class OTLPSpanExporter(SpanExporter):

Expand Down Expand Up @@ -133,7 +145,7 @@ def export(self, spans) -> SpanExportResult:

serialized_data = _ProtobufEncoder.serialize(spans)

for delay in expo(max_value=self._MAX_RETRY_TIMEOUT):
for delay in _expo(max_value=self._MAX_RETRY_TIMEOUT):

if delay == self._MAX_RETRY_TIMEOUT:
return SpanExportResult.FAILURE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from unittest.mock import patch

import requests
import responses

from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
Expand Down Expand Up @@ -269,3 +270,30 @@ def test_serialization(self, mock_post):
verify=exporter._certificate_file,
timeout=exporter._timeout,
)

@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

# return a retryable error
responses.add(
responses.POST,
"http://metrics.example.com/export",
json={"error": "something exploded"},
status=500,
)

exporter = OTLPMetricExporter(
endpoint="http://metrics.example.com/export"
)
metrics_data = self.metrics["sum_int"]

exporter.export(metrics_data)
mock_sleep.assert_called_once_with(1)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from unittest.mock import MagicMock, patch

import requests
import responses

from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
Expand Down Expand Up @@ -159,6 +160,31 @@ def test_serialize(self):
expected_encoding.SerializeToString(),
)

@responses.activate
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.backoff")
@patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep")
def test_handles_backoff_v2_api(self, mock_sleep, mock_backoff):
# In backoff ~= 2.0.0 the first value yielded from expo is None.
def generate_delays(*args, **kwargs):
yield None
yield 1

mock_backoff.expo.configure_mock(**{"side_effect": generate_delays})

# return a retryable error
responses.add(
responses.POST,
"http://logs.example.com/export",
json={"error": "something exploded"},
status=500,
)

exporter = OTLPLogExporter(endpoint="http://logs.example.com/export")
logs = self._get_sdk_log_data()

exporter.export(logs)
mock_sleep.assert_called_once_with(1)

@staticmethod
def _get_sdk_log_data() -> List[LogData]:
log1 = LogData(
Expand Down
Loading

0 comments on commit 370af5f

Please sign in to comment.