Skip to content

Commit

Permalink
Zipkin exporter v2 api support for protobuf format (#1318)
Browse files Browse the repository at this point in the history
  • Loading branch information
robwknox committed Nov 23, 2020
1 parent 03b2480 commit 8ca9e46
Show file tree
Hide file tree
Showing 10 changed files with 1,141 additions and 29 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ exclude =
__pycache__
exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/gen/
exporter/opentelemetry-exporter-jaeger/build/*
exporter/opentelemetry-exporter-zipkin/src/opentelemetry/exporter/zipkin/gen
docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/
docs/examples/opentelemetry-example-app/build/*
opentelemetry-proto/build/*
Expand Down
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ contextmanager-decorators=contextlib.contextmanager
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=
generated-members=zipkin_pb2.*

# Tells whether missing members accessed in mixin class should be ignored. A
# mixin class is detected if its name ends with "mixin" (case insensitive).
Expand Down
2 changes: 2 additions & 0 deletions exporter/opentelemetry-exporter-zipkin/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Support for v2 api protobuf format ([#1318](https://github.com/open-telemetry/opentelemetry-python/pull/1318))

## Version 0.14b0

Released 2020-10-13
Expand Down
1 change: 1 addition & 0 deletions exporter/opentelemetry-exporter-zipkin/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ package_dir=
=src
packages=find_namespace:
install_requires =
protobuf >= 3.12
requests ~= 2.7
opentelemetry-api == 0.16.dev0
opentelemetry-sdk == 0.16.dev0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
.. _Specification: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#zipkin-exporter
.. envvar:: OTEL_EXPORTER_ZIPKIN_ENDPOINT
.. envvar:: OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT
.. code:: python
from opentelemetry import trace
Expand Down Expand Up @@ -55,36 +58,54 @@
with tracer.start_as_current_span("foo"):
print("Hello world!")
The exporter supports endpoint configuration via the OTEL_EXPORTER_ZIPKIN_ENDPOINT environment variables as defined in the `Specification`_
The exporter supports the following environment variables for configuration:
:envvar:`OTEL_EXPORTER_ZIPKIN_ENDPOINT`: target to which the exporter will
send data. This may include a path (e.g. http://example.com:9411/api/v2/spans).
:envvar:`OTEL_EXPORTER_ZIPKIN_TRANSPORT_FORMAT`: transport interchange format
to use when sending data. Currently only Zipkin's v2 json and protobuf formats
are supported, with v2 json being the default.
API
---
"""

import json
import logging
import os
from typing import Optional, Sequence
from typing import Optional, Sequence, Union
from urllib.parse import urlparse

import requests

from opentelemetry.configuration import Configuration
from opentelemetry.exporter.zipkin.gen import zipkin_pb2
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.trace import Span, SpanContext, SpanKind

TRANSPORT_FORMAT_JSON = "json"
TRANSPORT_FORMAT_PROTOBUF = "protobuf"

DEFAULT_RETRY = False
DEFAULT_URL = "http://localhost:9411/api/v2/spans"
DEFAULT_MAX_TAG_VALUE_LENGTH = 128
ZIPKIN_HEADERS = {"Content-Type": "application/json"}

SPAN_KIND_MAP = {
SPAN_KIND_MAP_JSON = {
SpanKind.INTERNAL: None,
SpanKind.SERVER: "SERVER",
SpanKind.CLIENT: "CLIENT",
SpanKind.PRODUCER: "PRODUCER",
SpanKind.CONSUMER: "CONSUMER",
}

SPAN_KIND_MAP_PROTOBUF = {
SpanKind.INTERNAL: zipkin_pb2.Span.Kind.SPAN_KIND_UNSPECIFIED,
SpanKind.SERVER: zipkin_pb2.Span.Kind.SERVER,
SpanKind.CLIENT: zipkin_pb2.Span.Kind.CLIENT,
SpanKind.PRODUCER: zipkin_pb2.Span.Kind.PRODUCER,
SpanKind.CONSUMER: zipkin_pb2.Span.Kind.CONSUMER,
}

SUCCESS_STATUS_CODES = (200, 202)

logger = logging.getLogger(__name__)
Expand All @@ -100,6 +121,7 @@ class ZipkinSpanExporter(SpanExporter):
ipv4: Primary IPv4 address associated with this connection.
ipv6: Primary IPv6 address associated with this connection.
retry: Set to True to configure the exporter to retry on failure.
transport_format: transport interchange format to use
"""

def __init__(
Expand All @@ -110,12 +132,13 @@ def __init__(
ipv6: Optional[str] = None,
retry: Optional[str] = DEFAULT_RETRY,
max_tag_value_length: Optional[int] = DEFAULT_MAX_TAG_VALUE_LENGTH,
transport_format: Union[
TRANSPORT_FORMAT_JSON, TRANSPORT_FORMAT_PROTOBUF, None
] = None,
):
self.service_name = service_name
if url is None:
self.url = os.environ.get(
"OTEL_EXPORTER_ZIPKIN_ENDPOINT", DEFAULT_URL
)
self.url = Configuration().EXPORTER_ZIPKIN_ENDPOINT or DEFAULT_URL
else:
self.url = url

Expand All @@ -126,10 +149,27 @@ def __init__(
self.retry = retry
self.max_tag_value_length = max_tag_value_length

if transport_format is None:
self.transport_format = (
Configuration().EXPORTER_ZIPKIN_TRANSPORT_FORMAT
or TRANSPORT_FORMAT_JSON
)
else:
self.transport_format = transport_format

def export(self, spans: Sequence[Span]) -> SpanExportResult:
zipkin_spans = self._translate_to_zipkin(spans)
if self.transport_format == TRANSPORT_FORMAT_JSON:
content_type = "application/json"
elif self.transport_format == TRANSPORT_FORMAT_PROTOBUF:
content_type = "application/x-protobuf"
else:
logger.error("Invalid transport format %s", self.transport_format)
return SpanExportResult.FAILURE

result = requests.post(
url=self.url, data=json.dumps(zipkin_spans), headers=ZIPKIN_HEADERS
url=self.url,
data=self._translate_to_transport_format(spans),
headers={"Content-Type": content_type},
)

if result.status_code not in SUCCESS_STATUS_CODES:
Expand All @@ -147,8 +187,14 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult:
def shutdown(self) -> None:
pass

def _translate_to_zipkin(self, spans: Sequence[Span]):
def _translate_to_transport_format(self, spans: Sequence[Span]):
return (
self._translate_to_json(spans)
if self.transport_format == TRANSPORT_FORMAT_JSON
else self._translate_to_protobuf(spans)
)

def _translate_to_json(self, spans: Sequence[Span]):
local_endpoint = {"serviceName": self.service_name, "port": self.port}

if self.ipv4 is not None:
Expand All @@ -165,8 +211,8 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):

# Timestamp in zipkin spans is int of microseconds.
# see: https://zipkin.io/pages/instrumenting.html
start_timestamp_mus = _nsec_to_usec_round(span.start_time)
duration_mus = _nsec_to_usec_round(span.end_time - span.start_time)
start_timestamp_mus = nsec_to_usec_round(span.start_time)
duration_mus = nsec_to_usec_round(span.end_time - span.start_time)

zipkin_span = {
# Ensure left-zero-padding of traceId, spanId, parentId
Expand All @@ -176,7 +222,7 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
"timestamp": start_timestamp_mus,
"duration": duration_mus,
"localEndpoint": local_endpoint,
"kind": SPAN_KIND_MAP[span.kind],
"kind": SPAN_KIND_MAP_JSON[span.kind],
"tags": self._extract_tags_from_span(span),
"annotations": self._extract_annotations_from_events(
span.events
Expand Down Expand Up @@ -211,7 +257,94 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
zipkin_span["parentId"] = format(span.parent.span_id, "016x")

zipkin_spans.append(zipkin_span)
return zipkin_spans

return json.dumps(zipkin_spans)

def _translate_to_protobuf(self, spans: Sequence[Span]):

local_endpoint = zipkin_pb2.Endpoint(
service_name=self.service_name, port=self.port
)

if self.ipv4 is not None:
local_endpoint.ipv4 = self.ipv4

if self.ipv6 is not None:
local_endpoint.ipv6 = self.ipv6

pbuf_spans = zipkin_pb2.ListOfSpans()

for span in spans:
context = span.get_span_context()
trace_id = context.trace_id.to_bytes(
length=16, byteorder="big", signed=False,
)
span_id = self.format_pbuf_span_id(context.span_id)

# Timestamp in zipkin spans is int of microseconds.
# see: https://zipkin.io/pages/instrumenting.html
start_timestamp_mus = nsec_to_usec_round(span.start_time)
duration_mus = nsec_to_usec_round(span.end_time - span.start_time)

# pylint: disable=no-member
pbuf_span = zipkin_pb2.Span(
trace_id=trace_id,
id=span_id,
name=span.name,
timestamp=start_timestamp_mus,
duration=duration_mus,
local_endpoint=local_endpoint,
kind=SPAN_KIND_MAP_PROTOBUF[span.kind],
tags=self._extract_tags_from_span(span),
)

annotations = self._extract_annotations_from_events(span.events)

if annotations is not None:
for annotation in annotations:
pbuf_span.annotations.append(
zipkin_pb2.Annotation(
timestamp=annotation["timestamp"],
value=annotation["value"],
)
)

if span.instrumentation_info is not None:
pbuf_span.tags.update(
{
"otel.instrumentation_library.name": span.instrumentation_info.name,
"otel.instrumentation_library.version": span.instrumentation_info.version,
}
)

if span.status is not None:
pbuf_span.tags.update(
{"otel.status_code": str(span.status.status_code.value)}
)
if span.status.description is not None:
pbuf_span.tags.update(
{"otel.status_description": span.status.description}
)

if context.trace_flags.sampled:
pbuf_span.debug = True

if isinstance(span.parent, Span):
pbuf_span.parent_id = self.format_pbuf_span_id(
span.parent.get_span_context().span_id
)
elif isinstance(span.parent, SpanContext):
pbuf_span.parent_id = self.format_pbuf_span_id(
span.parent.span_id
)

pbuf_spans.spans.append(pbuf_span)

return pbuf_spans.SerializeToString()

@staticmethod
def format_pbuf_span_id(span_id: int):
return span_id.to_bytes(length=8, byteorder="big", signed=False)

def _extract_tags_from_dict(self, tags_dict):
tags = {}
Expand Down Expand Up @@ -251,13 +384,13 @@ def _extract_annotations_from_events(self, events):

annotations.append(
{
"timestamp": _nsec_to_usec_round(event.timestamp),
"timestamp": nsec_to_usec_round(event.timestamp),
"value": json.dumps({event.name: attrs}),
}
)
return annotations


def _nsec_to_usec_round(nsec):
def nsec_to_usec_round(nsec):
"""Round nanoseconds to microseconds"""
return (nsec + 500) // 10 ** 3
Empty file.
Loading

0 comments on commit 8ca9e46

Please sign in to comment.