Skip to content

Commit

Permalink
opentracing-shim: add testbed for otshim (#727)
Browse files Browse the repository at this point in the history
This commit ports the OpenTracing testbed[1] to check that the ot-shim is
working as expected using different frameworks.

Gevent doesn't support context vars yet[2], so those tests are not compatible
with opentelemetry and were not ported.

[1] https://github.com/opentracing/opentracing-python/tree/master/testbed
[2] gevent/gevent#1407

Co-authored-by: Mauricio Vásquez <mauricio@kinvolk.io>
Co-authored-by: alrex <aboten@lightstep.com>
  • Loading branch information
3 people committed Jun 5, 2020
1 parent 1086c83 commit 5c03a97
Show file tree
Hide file tree
Showing 43 changed files with 1,506 additions and 9 deletions.
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ pytest!=5.2.3
pytest-cov>=2.8
readme-renderer~=24.0
httpretty~=1.0
opentracing~=2.2.0
47 changes: 47 additions & 0 deletions ext/opentelemetry-ext-opentracing-shim/tests/testbed/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

Testbed suite for the OpenTelemetry-OpenTracing Bridge
======================================================

Testbed suite designed to test the API changes.

Build and test.
---------------

.. code-block:: sh
tox -e py37-test-opentracing-shim
Alternatively, due to the organization of the suite, it's possible to run directly the tests using ``py.test``\ :

.. code-block:: sh
py.test -s testbed/test_multiple_callbacks/test_threads.py
Tested frameworks
-----------------

Currently the examples cover ``threading`` and ``asyncio``.

List of patterns
----------------


* `Active Span replacement <test_active_span_replacement>`_ - Start an isolated task and query for its results in another task/thread.
* `Client-Server <test_client_server>`_ - Typical client-server example.
* `Common Request Handler <test_common_request_handler>`_ - One request handler for all requests.
* `Late Span finish <test_late_span_finish>`_ - Late parent ``Span`` finish.
* `Multiple callbacks <test_multiple_callbacks>`_ - Multiple callbacks spawned at the same time.
* `Nested callbacks <test_nested_callbacks>`_ - One callback at a time, defined in a pipeline fashion.
* `Subtask Span propagation <test_subtask_span_propagation>`_ - ``Span`` propagation for subtasks/coroutines.

Adding new patterns
-------------------

A new pattern is composed of a directory under *testbed* with the *test_* prefix, and containing the files for each platform, also with the *test_* prefix:

.. code-block::
testbed/
test_new_pattern/
test_threads.py
test_asyncio.py
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import opentelemetry.ext.opentracing_shim as opentracingshim
from opentelemetry.sdk import trace
from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)


class MockTracer(opentracingshim.TracerShim):
"""Wrapper of `opentracingshim.TracerShim`.
MockTracer extends `opentracingshim.TracerShim` by adding a in memory
span exporter that can be used to get the list of finished spans."""

def __init__(self):
tracer_provider = trace.TracerProvider()
oteltracer = tracer_provider.get_tracer(__name__)
super(MockTracer, self).__init__(oteltracer)
exporter = InMemorySpanExporter()
span_processor = SimpleExportSpanProcessor(exporter)
tracer_provider.add_span_processor(span_processor)

self.exporter = exporter

def finished_spans(self):
return self.exporter.get_finished_spans()
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

Active Span replacement example.
================================

This example shows a ``Span`` being created and then passed to an asynchronous task, which will temporary activate it to finish its processing, and further restore the previously active ``Span``.

``threading`` implementation:

.. code-block:: python
# Create a new Span for this task
with self.tracer.start_active_span("task"):
with self.tracer.scope_manager.activate(span, True):
# Simulate work strictly related to the initial Span
pass
# Use the task span as parent of a new subtask
with self.tracer.start_active_span("subtask"):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from __future__ import print_function

import asyncio

from ..otel_ot_shim_tracer import MockTracer
from ..testcase import OpenTelemetryTestCase
from ..utils import stop_loop_when


class TestAsyncio(OpenTelemetryTestCase):
def setUp(self):
self.tracer = MockTracer()
self.loop = asyncio.get_event_loop()

def test_main(self):
# Start an isolated task and query for its result -and finish it-
# in another task/thread
span = self.tracer.start_span("initial")
self.submit_another_task(span)

stop_loop_when(
self.loop,
lambda: len(self.tracer.finished_spans()) >= 3,
timeout=5.0,
)
self.loop.run_forever()

spans = self.tracer.finished_spans()
self.assertEqual(len(spans), 3)
self.assertNamesEqual(spans, ["initial", "subtask", "task"])

# task/subtask are part of the same trace,
# and subtask is a child of task
self.assertSameTrace(spans[1], spans[2])
self.assertIsChildOf(spans[1], spans[2])

# initial task is not related in any way to those two tasks
self.assertNotSameTrace(spans[0], spans[1])
self.assertEqual(spans[0].parent, None)

async def task(self, span):
# Create a new Span for this task
with self.tracer.start_active_span("task"):

with self.tracer.scope_manager.activate(span, True):
# Simulate work strictly related to the initial Span
pass

# Use the task span as parent of a new subtask
with self.tracer.start_active_span("subtask"):
pass

def submit_another_task(self, span):
self.loop.create_task(self.task(span))
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from __future__ import print_function

from concurrent.futures import ThreadPoolExecutor

from ..otel_ot_shim_tracer import MockTracer
from ..testcase import OpenTelemetryTestCase


class TestThreads(OpenTelemetryTestCase):
def setUp(self):
self.tracer = MockTracer()
# use max_workers=3 as a general example even if only one would suffice
self.executor = ThreadPoolExecutor(max_workers=3)

def test_main(self):
# Start an isolated task and query for its result -and finish it-
# in another task/thread
span = self.tracer.start_span("initial")
self.submit_another_task(span)

self.executor.shutdown(True)

spans = self.tracer.finished_spans()
self.assertEqual(len(spans), 3)
self.assertNamesEqual(spans, ["initial", "subtask", "task"])

# task/subtask are part of the same trace,
# and subtask is a child of task
self.assertSameTrace(spans[1], spans[2])
self.assertIsChildOf(spans[1], spans[2])

# initial task is not related in any way to those two tasks
self.assertNotSameTrace(spans[0], spans[1])
self.assertEqual(spans[0].parent, None)
self.assertEqual(spans[2].parent, None)

def task(self, span):
# Create a new Span for this task
with self.tracer.start_active_span("task"):

with self.tracer.scope_manager.activate(span, True):
# Simulate work strictly related to the initial Span
pass

# Use the task span as parent of a new subtask
with self.tracer.start_active_span("subtask"):
pass

def submit_another_task(self, span):
self.executor.submit(self.task, span)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

Client-Server example.
======================

This example shows a ``Span`` created by a ``Client``, which will send a ``Message`` / ``SpanContext`` to a ``Server``, which will in turn extract such context and use it as parent of a new (server-side) ``Span``.

``Client.send()`` is used to send messages and inject the ``SpanContext`` using the ``TEXT_MAP`` format, and ``Server.process()`` will process received messages and will extract the context used as parent.

.. code-block:: python
def send(self):
with self.tracer.start_active_span("send") as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
message = {}
self.tracer.inject(scope.span.context,
opentracing.Format.TEXT_MAP,
message)
self.queue.put(message)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from __future__ import print_function

import asyncio

import opentracing
from opentracing.ext import tags

from ..otel_ot_shim_tracer import MockTracer
from ..testcase import OpenTelemetryTestCase
from ..utils import get_logger, get_one_by_tag, stop_loop_when

logger = get_logger(__name__)


class Server:
def __init__(self, *args, **kwargs):
tracer = kwargs.pop("tracer")
queue = kwargs.pop("queue")
super(Server, self).__init__(*args, **kwargs)

self.tracer = tracer
self.queue = queue

async def run(self):
value = await self.queue.get()
self.process(value)

def process(self, message):
logger.info("Processing message in server")

ctx = self.tracer.extract(opentracing.Format.TEXT_MAP, message)
with self.tracer.start_active_span("receive", child_of=ctx) as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)


class Client:
def __init__(self, tracer, queue):
self.tracer = tracer
self.queue = queue

async def send(self):
with self.tracer.start_active_span("send") as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)

message = {}
self.tracer.inject(
scope.span.context, opentracing.Format.TEXT_MAP, message
)
await self.queue.put(message)

logger.info("Sent message from client")


class TestAsyncio(OpenTelemetryTestCase):
def setUp(self):
self.tracer = MockTracer()
self.queue = asyncio.Queue()
self.loop = asyncio.get_event_loop()
self.server = Server(tracer=self.tracer, queue=self.queue)

def test(self):
client = Client(self.tracer, self.queue)
self.loop.create_task(self.server.run())
self.loop.create_task(client.send())

stop_loop_when(
self.loop,
lambda: len(self.tracer.finished_spans()) >= 2,
timeout=5.0,
)
self.loop.run_forever()

spans = self.tracer.finished_spans()
self.assertIsNotNone(
get_one_by_tag(spans, tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
)
self.assertIsNotNone(
get_one_by_tag(spans, tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from __future__ import print_function

from queue import Queue
from threading import Thread

import opentracing
from opentracing.ext import tags

from ..otel_ot_shim_tracer import MockTracer
from ..testcase import OpenTelemetryTestCase
from ..utils import await_until, get_logger, get_one_by_tag

logger = get_logger(__name__)


class Server(Thread):
def __init__(self, *args, **kwargs):
tracer = kwargs.pop("tracer")
queue = kwargs.pop("queue")
super(Server, self).__init__(*args, **kwargs)

self.daemon = True
self.tracer = tracer
self.queue = queue

def run(self):
value = self.queue.get()
self.process(value)

def process(self, message):
logger.info("Processing message in server")

ctx = self.tracer.extract(opentracing.Format.TEXT_MAP, message)
with self.tracer.start_active_span("receive", child_of=ctx) as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)


class Client:
def __init__(self, tracer, queue):
self.tracer = tracer
self.queue = queue

def send(self):
with self.tracer.start_active_span("send") as scope:
scope.span.set_tag(tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)

message = {}
self.tracer.inject(
scope.span.context, opentracing.Format.TEXT_MAP, message
)
self.queue.put(message)

logger.info("Sent message from client")


class TestThreads(OpenTelemetryTestCase):
def setUp(self):
self.tracer = MockTracer()
self.queue = Queue()
self.server = Server(tracer=self.tracer, queue=self.queue)
self.server.start()

def test(self):
client = Client(self.tracer, self.queue)
client.send()

await_until(lambda: len(self.tracer.finished_spans()) >= 2)

spans = self.tracer.finished_spans()
self.assertIsNotNone(
get_one_by_tag(spans, tags.SPAN_KIND, tags.SPAN_KIND_RPC_SERVER)
)
self.assertIsNotNone(
get_one_by_tag(spans, tags.SPAN_KIND, tags.SPAN_KIND_RPC_CLIENT)
)
Loading

0 comments on commit 5c03a97

Please sign in to comment.