From 0f3a5c820c464763f33eb449b1774ee28f99bc6b Mon Sep 17 00:00:00 2001 From: Josh Suereth Date: Mon, 12 Oct 2020 18:40:41 -0400 Subject: [PATCH] Simple fix for #331: Add atomic_flag protection around exporter usage in SimpleProcessor. (#358) --- .../opentelemetry/common/spin_lock_mutex.h | 54 +++++++++++++++++++ api/include/opentelemetry/metrics/provider.h | 19 +++---- api/include/opentelemetry/trace/provider.h | 19 +++---- .../opentelemetry/sdk/trace/exporter.h | 2 + .../opentelemetry/sdk/trace/processor.h | 2 + .../sdk/trace/simple_processor.h | 16 +++++- sdk/test/trace/simple_processor_test.cc | 39 ++++++++++++++ 7 files changed, 126 insertions(+), 25 deletions(-) create mode 100644 api/include/opentelemetry/common/spin_lock_mutex.h diff --git a/api/include/opentelemetry/common/spin_lock_mutex.h b/api/include/opentelemetry/common/spin_lock_mutex.h new file mode 100644 index 00000000000..cd911e47ca3 --- /dev/null +++ b/api/include/opentelemetry/common/spin_lock_mutex.h @@ -0,0 +1,54 @@ +#pragma once + +#include + +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace common +{ + +/** + * A Mutex which uses atomic flags and spin-locks instead of halting threads. + * + * This class implements the `BasicLockable` specification: + * https://en.cppreference.com/w/cpp/named_req/BasicLockable + */ +class SpinLockMutex +{ +public: + SpinLockMutex() noexcept {} + ~SpinLockMutex() noexcept = default; + SpinLockMutex(const SpinLockMutex &) = delete; + SpinLockMutex &operator=(const SpinLockMutex &) = delete; + SpinLockMutex &operator=(const SpinLockMutex &) volatile = delete; + + /** + * Blocks until a lock can be obtained for the current thread. + * + * This mutex will spin the current CPU waiting for the lock to be available. This can have + * decent performance in scenarios where there is low lock contention and lock-holders acheive + * their work quickly. It degrades in scenarios where locked tasks take a long time. + */ + void lock() noexcept + { + /* Note: We expect code protected by this lock to be "fast", i.e. we do NOT incrementally + * back-off and wait/notify here, we just loop until we have access, then try again. + * + * This has the downside that we could be spinning a long time if the exporter is slow. + * Note: in C++20x we could use `.wait` to make this slightly better. This should move to + * an exponential-back-off / wait strategy. + */ + while (flag_.test_and_set(std::memory_order_acquire)) + /** TODO - We should immmediately yield if the machine is single processor. */ + ; + } + /** Releases the lock held by the execution agent. Throws no exceptions. */ + void unlock() noexcept { flag_.clear(std::memory_order_release); } + +private: + std::atomic_flag flag_{ATOMIC_FLAG_INIT}; +}; + +} // namespace common +OPENTELEMETRY_END_NAMESPACE diff --git a/api/include/opentelemetry/metrics/provider.h b/api/include/opentelemetry/metrics/provider.h index 8559562d8de..88b97fdf5fe 100644 --- a/api/include/opentelemetry/metrics/provider.h +++ b/api/include/opentelemetry/metrics/provider.h @@ -1,7 +1,8 @@ #pragma once -#include +#include +#include "opentelemetry/common/spin_lock_mutex.h" #include "opentelemetry/metrics/meter_provider.h" #include "opentelemetry/metrics/noop.h" #include "opentelemetry/nostd/shared_ptr.h" @@ -23,12 +24,8 @@ class Provider */ static nostd::shared_ptr GetMeterProvider() noexcept { - while (GetLock().test_and_set(std::memory_order_acquire)) - ; - auto provider = nostd::shared_ptr(GetProvider()); - GetLock().clear(std::memory_order_release); - - return provider; + std::lock_guard guard(GetLock()); + return nostd::shared_ptr(GetProvider()); } /** @@ -36,10 +33,8 @@ class Provider */ static void SetMeterProvider(nostd::shared_ptr tp) noexcept { - while (GetLock().test_and_set(std::memory_order_acquire)) - ; + std::lock_guard guard(GetLock()); GetProvider() = tp; - GetLock().clear(std::memory_order_release); } private: @@ -49,9 +44,9 @@ class Provider return provider; } - static std::atomic_flag &GetLock() noexcept + static common::SpinLockMutex &GetLock() noexcept { - static std::atomic_flag lock = ATOMIC_FLAG_INIT; + static common::SpinLockMutex lock; return lock; } }; diff --git a/api/include/opentelemetry/trace/provider.h b/api/include/opentelemetry/trace/provider.h index 0cfed6d1658..c78c4ad90c8 100644 --- a/api/include/opentelemetry/trace/provider.h +++ b/api/include/opentelemetry/trace/provider.h @@ -1,7 +1,8 @@ #pragma once -#include +#include +#include "opentelemetry/common/spin_lock_mutex.h" #include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/trace/noop.h" #include "opentelemetry/trace/tracer_provider.h" @@ -23,12 +24,8 @@ class Provider */ static nostd::shared_ptr GetTracerProvider() noexcept { - while (GetLock().test_and_set(std::memory_order_acquire)) - ; - auto provider = nostd::shared_ptr(GetProvider()); - GetLock().clear(std::memory_order_release); - - return provider; + std::lock_guard guard(GetLock()); + return nostd::shared_ptr(GetProvider()); } /** @@ -36,10 +33,8 @@ class Provider */ static void SetTracerProvider(nostd::shared_ptr tp) noexcept { - while (GetLock().test_and_set(std::memory_order_acquire)) - ; + std::lock_guard guard(GetLock()); GetProvider() = tp; - GetLock().clear(std::memory_order_release); } private: @@ -49,9 +44,9 @@ class Provider return provider; } - static std::atomic_flag &GetLock() noexcept + static common::SpinLockMutex &GetLock() noexcept { - static std::atomic_flag lock = ATOMIC_FLAG_INIT; + static common::SpinLockMutex lock; return lock; } }; diff --git a/sdk/include/opentelemetry/sdk/trace/exporter.h b/sdk/include/opentelemetry/sdk/trace/exporter.h index ec5473edb80..d7eb6f532aa 100644 --- a/sdk/include/opentelemetry/sdk/trace/exporter.h +++ b/sdk/include/opentelemetry/sdk/trace/exporter.h @@ -39,6 +39,8 @@ class SpanExporter * custom recordables or use the default SpanData recordable provided by the * SDK. * @return a newly initialized Recordable object + * + * Note: This method must be callable from multiple threads. */ virtual std::unique_ptr MakeRecordable() noexcept = 0; diff --git a/sdk/include/opentelemetry/sdk/trace/processor.h b/sdk/include/opentelemetry/sdk/trace/processor.h index 04f9f8ab9d3..ebd37f6218b 100644 --- a/sdk/include/opentelemetry/sdk/trace/processor.h +++ b/sdk/include/opentelemetry/sdk/trace/processor.h @@ -24,6 +24,8 @@ class SpanProcessor * Create a span recordable. This requests a new span recordable from the * associated exporter. * @return a newly initialized recordable + * + * Note: This method must be callable from multiple threads. */ virtual std::unique_ptr MakeRecordable() noexcept = 0; diff --git a/sdk/include/opentelemetry/sdk/trace/simple_processor.h b/sdk/include/opentelemetry/sdk/trace/simple_processor.h index c9cc617fa6d..e10d8fa5151 100644 --- a/sdk/include/opentelemetry/sdk/trace/simple_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/simple_processor.h @@ -1,5 +1,9 @@ #pragma once +#include +#include + +#include "opentelemetry/common/spin_lock_mutex.h" #include "opentelemetry/sdk/trace/exporter.h" #include "opentelemetry/sdk/trace/processor.h" @@ -13,6 +17,9 @@ namespace trace * SpanExporter, as soon as they are finished. * * OnEnd and ForceFlush are no-ops. + * + * All calls to the configured SpanExporter are synchronized using a + * spin-lock on an atomic_flag. */ class SimpleSpanProcessor : public SpanProcessor { @@ -35,6 +42,7 @@ class SimpleSpanProcessor : public SpanProcessor void OnEnd(std::unique_ptr &&span) noexcept override { nostd::span> batch(&span, 1); + const std::lock_guard locked(lock_); if (exporter_->Export(batch) == ExportResult::kFailure) { /* Once it is defined how the SDK does logging, an error should be @@ -48,11 +56,17 @@ class SimpleSpanProcessor : public SpanProcessor void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override { - exporter_->Shutdown(timeout); + // We only call shutdown ONCE. + if (!shutdown_latch_.test_and_set(std::memory_order_acquire)) + { + exporter_->Shutdown(timeout); + } } private: std::unique_ptr exporter_; + opentelemetry::common::SpinLockMutex lock_; + std::atomic_flag shutdown_latch_{ATOMIC_FLAG_INIT}; }; } // namespace trace } // namespace sdk diff --git a/sdk/test/trace/simple_processor_test.cc b/sdk/test/trace/simple_processor_test.cc index d04584b5dde..7b5db6e1eb6 100644 --- a/sdk/test/trace/simple_processor_test.cc +++ b/sdk/test/trace/simple_processor_test.cc @@ -1,6 +1,7 @@ #include "opentelemetry/sdk/trace/simple_processor.h" #include "opentelemetry/exporters/memory/in_memory_span_exporter.h" #include "opentelemetry/nostd/span.h" +#include "opentelemetry/sdk/trace/exporter.h" #include "opentelemetry/sdk/trace/span_data.h" #include @@ -27,3 +28,41 @@ TEST(SimpleProcessor, ToInMemorySpanExporter) processor.Shutdown(); } + +// An exporter that does nothing but record (and give back ) the # of times Shutdown was called. +class RecordShutdownExporter final : public SpanExporter +{ +public: + RecordShutdownExporter(int *shutdown_counter) : shutdown_counter_(shutdown_counter) {} + + std::unique_ptr MakeRecordable() noexcept override + { + return std::unique_ptr(new SpanData()); + } + + ExportResult Export( + const opentelemetry::nostd::span> &recordables) noexcept override + { + return ExportResult::kSuccess; + } + + void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override + { + *shutdown_counter_ += 1; + } + +private: + int *shutdown_counter_; +}; + +TEST(SimpleSpanProcessor, ShutdownCalledOnce) +{ + int shutdowns = 0; + std::unique_ptr exporter(new RecordShutdownExporter(&shutdowns)); + SimpleSpanProcessor processor(std::move(exporter)); + EXPECT_EQ(0, shutdowns); + processor.Shutdown(); + EXPECT_EQ(1, shutdowns); + processor.Shutdown(); + EXPECT_EQ(1, shutdowns); +}