Skip to content

Commit

Permalink
Simple fix for open-telemetry#331: Add atomic_flag protection around …
Browse files Browse the repository at this point in the history
…exporter usage in SimpleProcessor. (open-telemetry#358)
  • Loading branch information
jsuereth committed Oct 12, 2020
1 parent 5b72b00 commit 0f3a5c8
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 25 deletions.
54 changes: 54 additions & 0 deletions api/include/opentelemetry/common/spin_lock_mutex.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include <atomic>

#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace common
{

/**
* A Mutex which uses atomic flags and spin-locks instead of halting threads.
*
* This class implements the `BasicLockable` specification:
* https://en.cppreference.com/w/cpp/named_req/BasicLockable
*/
class SpinLockMutex
{
public:
SpinLockMutex() noexcept {}
~SpinLockMutex() noexcept = default;
SpinLockMutex(const SpinLockMutex &) = delete;
SpinLockMutex &operator=(const SpinLockMutex &) = delete;
SpinLockMutex &operator=(const SpinLockMutex &) volatile = delete;

/**
* Blocks until a lock can be obtained for the current thread.
*
* This mutex will spin the current CPU waiting for the lock to be available. This can have
* decent performance in scenarios where there is low lock contention and lock-holders acheive
* their work quickly. It degrades in scenarios where locked tasks take a long time.
*/
void lock() noexcept
{
/* Note: We expect code protected by this lock to be "fast", i.e. we do NOT incrementally
* back-off and wait/notify here, we just loop until we have access, then try again.
*
* This has the downside that we could be spinning a long time if the exporter is slow.
* Note: in C++20x we could use `.wait` to make this slightly better. This should move to
* an exponential-back-off / wait strategy.
*/
while (flag_.test_and_set(std::memory_order_acquire))
/** TODO - We should immmediately yield if the machine is single processor. */
;
}
/** Releases the lock held by the execution agent. Throws no exceptions. */
void unlock() noexcept { flag_.clear(std::memory_order_release); }

private:
std::atomic_flag flag_{ATOMIC_FLAG_INIT};
};

} // namespace common
OPENTELEMETRY_END_NAMESPACE
19 changes: 7 additions & 12 deletions api/include/opentelemetry/metrics/provider.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include <atomic>
#include <mutex>

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/metrics/meter_provider.h"
#include "opentelemetry/metrics/noop.h"
#include "opentelemetry/nostd/shared_ptr.h"
Expand All @@ -23,23 +24,17 @@ class Provider
*/
static nostd::shared_ptr<MeterProvider> GetMeterProvider() noexcept
{
while (GetLock().test_and_set(std::memory_order_acquire))
;
auto provider = nostd::shared_ptr<MeterProvider>(GetProvider());
GetLock().clear(std::memory_order_release);

return provider;
std::lock_guard<common::SpinLockMutex> guard(GetLock());
return nostd::shared_ptr<MeterProvider>(GetProvider());
}

/**
* Changes the singleton MeterProvider.
*/
static void SetMeterProvider(nostd::shared_ptr<MeterProvider> tp) noexcept
{
while (GetLock().test_and_set(std::memory_order_acquire))
;
std::lock_guard<common::SpinLockMutex> guard(GetLock());
GetProvider() = tp;
GetLock().clear(std::memory_order_release);
}

private:
Expand All @@ -49,9 +44,9 @@ class Provider
return provider;
}

static std::atomic_flag &GetLock() noexcept
static common::SpinLockMutex &GetLock() noexcept
{
static std::atomic_flag lock = ATOMIC_FLAG_INIT;
static common::SpinLockMutex lock;
return lock;
}
};
Expand Down
19 changes: 7 additions & 12 deletions api/include/opentelemetry/trace/provider.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include <atomic>
#include <mutex>

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/trace/noop.h"
#include "opentelemetry/trace/tracer_provider.h"
Expand All @@ -23,23 +24,17 @@ class Provider
*/
static nostd::shared_ptr<TracerProvider> GetTracerProvider() noexcept
{
while (GetLock().test_and_set(std::memory_order_acquire))
;
auto provider = nostd::shared_ptr<TracerProvider>(GetProvider());
GetLock().clear(std::memory_order_release);

return provider;
std::lock_guard<common::SpinLockMutex> guard(GetLock());
return nostd::shared_ptr<TracerProvider>(GetProvider());
}

/**
* Changes the singleton TracerProvider.
*/
static void SetTracerProvider(nostd::shared_ptr<TracerProvider> tp) noexcept
{
while (GetLock().test_and_set(std::memory_order_acquire))
;
std::lock_guard<common::SpinLockMutex> guard(GetLock());
GetProvider() = tp;
GetLock().clear(std::memory_order_release);
}

private:
Expand All @@ -49,9 +44,9 @@ class Provider
return provider;
}

static std::atomic_flag &GetLock() noexcept
static common::SpinLockMutex &GetLock() noexcept
{
static std::atomic_flag lock = ATOMIC_FLAG_INIT;
static common::SpinLockMutex lock;
return lock;
}
};
Expand Down
2 changes: 2 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class SpanExporter
* custom recordables or use the default SpanData recordable provided by the
* SDK.
* @return a newly initialized Recordable object
*
* Note: This method must be callable from multiple threads.
*/
virtual std::unique_ptr<Recordable> MakeRecordable() noexcept = 0;

Expand Down
2 changes: 2 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class SpanProcessor
* Create a span recordable. This requests a new span recordable from the
* associated exporter.
* @return a newly initialized recordable
*
* Note: This method must be callable from multiple threads.
*/
virtual std::unique_ptr<Recordable> MakeRecordable() noexcept = 0;

Expand Down
16 changes: 15 additions & 1 deletion sdk/include/opentelemetry/sdk/trace/simple_processor.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#pragma once

#include <atomic>
#include <mutex>

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/processor.h"

Expand All @@ -13,6 +17,9 @@ namespace trace
* SpanExporter, as soon as they are finished.
*
* OnEnd and ForceFlush are no-ops.
*
* All calls to the configured SpanExporter are synchronized using a
* spin-lock on an atomic_flag.
*/
class SimpleSpanProcessor : public SpanProcessor
{
Expand All @@ -35,6 +42,7 @@ class SimpleSpanProcessor : public SpanProcessor
void OnEnd(std::unique_ptr<Recordable> &&span) noexcept override
{
nostd::span<std::unique_ptr<Recordable>> batch(&span, 1);
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
if (exporter_->Export(batch) == ExportResult::kFailure)
{
/* Once it is defined how the SDK does logging, an error should be
Expand All @@ -48,11 +56,17 @@ class SimpleSpanProcessor : public SpanProcessor

void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
{
exporter_->Shutdown(timeout);
// We only call shutdown ONCE.
if (!shutdown_latch_.test_and_set(std::memory_order_acquire))
{
exporter_->Shutdown(timeout);
}
}

private:
std::unique_ptr<SpanExporter> exporter_;
opentelemetry::common::SpinLockMutex lock_;
std::atomic_flag shutdown_latch_{ATOMIC_FLAG_INIT};
};
} // namespace trace
} // namespace sdk
Expand Down
39 changes: 39 additions & 0 deletions sdk/test/trace/simple_processor_test.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "opentelemetry/sdk/trace/simple_processor.h"
#include "opentelemetry/exporters/memory/in_memory_span_exporter.h"
#include "opentelemetry/nostd/span.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/span_data.h"

#include <gtest/gtest.h>
Expand All @@ -27,3 +28,41 @@ TEST(SimpleProcessor, ToInMemorySpanExporter)

processor.Shutdown();
}

// An exporter that does nothing but record (and give back ) the # of times Shutdown was called.
class RecordShutdownExporter final : public SpanExporter
{
public:
RecordShutdownExporter(int *shutdown_counter) : shutdown_counter_(shutdown_counter) {}

std::unique_ptr<Recordable> MakeRecordable() noexcept override
{
return std::unique_ptr<Recordable>(new SpanData());
}

ExportResult Export(
const opentelemetry::nostd::span<std::unique_ptr<Recordable>> &recordables) noexcept override
{
return ExportResult::kSuccess;
}

void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
{
*shutdown_counter_ += 1;
}

private:
int *shutdown_counter_;
};

TEST(SimpleSpanProcessor, ShutdownCalledOnce)
{
int shutdowns = 0;
std::unique_ptr<RecordShutdownExporter> exporter(new RecordShutdownExporter(&shutdowns));
SimpleSpanProcessor processor(std::move(exporter));
EXPECT_EQ(0, shutdowns);
processor.Shutdown();
EXPECT_EQ(1, shutdowns);
processor.Shutdown();
EXPECT_EQ(1, shutdowns);
}

0 comments on commit 0f3a5c8

Please sign in to comment.