Skip to content

Commit

Permalink
fix: add meter provider implementation and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
robertlaurin committed Feb 22, 2022
1 parent 44a3edb commit f64082e
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 7 deletions.
27 changes: 27 additions & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/export.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Export
ExportError = Class.new(OpenTelemetry::Error)

# The operation finished successfully.
SUCCESS = 0

# The operation finished with an error.
FAILURE = 1

# The operation timed out.
TIMEOUT = 2
end
end
end
end

require 'opentelemetry/sdk/metrics/export/metric_reader'
require 'opentelemetry/sdk/metrics/export/console_exporter'
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Export
class ConsoleExporter
PREFERRED_TEMPORALITY = 'delta'

def export(metrics)
puts metrics
end

def shutdown
SUCCESS
end

def preferred_temporality
PREFERRED_TEMPORALITY
end
end
end
end
end
end

26 changes: 26 additions & 0 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/export/metric_reader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module SDK
module Metrics
module Export
class MetricReader
def initialize(exporter)
@exporter = exporter
end

def collect; end

def shutdown(timeout: nil)
SUCCESS
end
end
end
end
end
end

98 changes: 91 additions & 7 deletions metrics_sdk/lib/opentelemetry/sdk/metrics/meter_provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,106 @@ class MeterProvider < OpenTelemetry::Metrics::MeterProvider
Key = Struct.new(:name, :version)
private_constant(:Key)

def initialize
def initialize(resource: OpenTelemetry::SDK::Resources::Resource.create)
@mutex = Mutex.new
@registry = {}
@registry_mutex = Mutex.new
@stopped = false

@metric_readers = []
@views = []
end

# Returns a {Meter} instance.
#
# @param [optional String] name Instrumentation package name
# @param [String] name Instrumentation package name
# @param [optional String] version Instrumentation package version
#
# @return [Meter]
def meter(name = nil, version = nil)
name ||= ''
def meter(name, version = nil)
version ||= ''
OpenTelemetry.logger.warn 'calling MeterProvider#meter without providing a meter name.' if name.empty?
@registry_mutex.synchronize { @registry[Key.new(name, version)] ||= Meter.new(name, version) }
if @stopped
OpenTelemetry.logger.warn 'calling MeterProvider#meter after shutdown, a noop meter will be returned.'
OpenTelemetry::Metrics::Meter.new
else
@mutex.synchronize { @registry[Key.new(name, version)] ||= Meter.new(name, version, self) }
end
end

# Attempts to stop all the activity for this {MeterProvider}.
#
# Calls MetricReader#shutdown for all registered MetricReaders.
# Calls MetricExporter#shutdown for all registered MetricExporters.
#
# After this is called all the newly created {Meter}s will be no-op.
#
# @param [optional Numeric] timeout An optional timeout in seconds.
# @return [Integer] Export::SUCCESS if no error occurred, Export::FAILURE if
# a non-specific failure occurred, Export::TIMEOUT if a timeout occurred.
def shutdown(timeout: nil)
@mutex.synchronize do
if @stopped
OpenTelemetry.logger.warn('calling MetricProvider#shutdown multiple times.')
Export::FAILURE
else
start_time = OpenTelemetry::Common::Utilities.timeout_timestamp

results = @metric_readers.map do |metric_reader|
remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)
break [Export::TIMEOUT] if remaining_timeout&.zero?
metric_reader.shutdown(timeout: remaining_timeout)
end

@stopped = true
results.max || Export::SUCCESS
end
end
end

# This method provides a way for provider to notify the registered
# {MetricReader} instances, so they can do as much as they could to consume
# or send the metrics. Note: unlike Push Metric Exporter which can send data on
# its own schedule, Pull Metric Exporter can only send the data when it is
# being asked by the scraper, so ForceFlush would not make much sense.
#
# @param [optional Numeric] timeout An optional timeout in seconds.
# @return [Integer] Export::SUCCESS if no error occurred, Export::FAILURE if
# a non-specific failure occurred, Export::TIMEOUT if a timeout occurred.
def force_flush(timeout: nil)
@mutex.synchronize do
return Export::SUCCESS if @stopped

start_time = OpenTelemetry::Common::Utilities.timeout_timestamp
results = @metric_readers.map do |metric_reader|
remaining_timeout = OpenTelemetry::Common::Utilities.maybe_timeout(timeout, start_time)
return Export::TIMEOUT if remaining_timeout&.zero?

metric_reader.force_flush(timeout: remaining_timeout)
end
results.max || Export::SUCCESS
end
end

# Adds a new MetricReader to this {MeterProvider}.
#
# @param metric_reader the new MetricReader to be added.
def add_metric_reader(metric_reader)
@mutex.synchronize do
if @stopped
OpenTelemetry.logger.warn('calling MetricProvider#add_metric_reader after shutdown.')
return
end

@metric_readers = @metric_readers.push(metric_reader)
end
end

# The type of the Instrument(s) (optional).
# The name of the Instrument(s). OpenTelemetry SDK authors MAY choose to support wildcard characters, with the question mark (?) matching exactly one character and the asterisk character (*) matching zero or more characters.
# The name of the Meter (optional).
# The version of the Meter (optional).
# The schema_url of the Meter (optional).
def add_view
# TODO: For each meter add this view to all applicable instruments
end
end
end
Expand Down
114 changes: 114 additions & 0 deletions metrics_sdk/test/opentelemetry/sdk/metrics/meter_provider_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'test_helper'

describe OpenTelemetry::SDK::Metrics::MeterProvider do
before do
reset_metrics_sdk
OpenTelemetry::SDK.configure
end

describe '#meter' do
it 'requires a meter name' do
_(-> { OpenTelemetry.meter_provider.meter }).must_raise(ArgumentError)
end

it 'creates a new meter' do
meter = OpenTelemetry.meter_provider.meter('test')

_(meter).must_be_instance_of(OpenTelemetry::SDK::Metrics::Meter)
end

it 'repeated calls does not recreate a meter of the same name' do
meter_a = OpenTelemetry.meter_provider.meter('test')
meter_b = OpenTelemetry.meter_provider.meter('test')

_(meter_a).must_equal(meter_b)
end
end

describe '#shutdown' do
it 'repeated calls to shutdown result in a failure' do
with_test_logger do |log_stream|
_(OpenTelemetry.meter_provider.shutdown).must_equal(OpenTelemetry::SDK::Metrics::Export::SUCCESS)
_(OpenTelemetry.meter_provider.shutdown).must_equal(OpenTelemetry::SDK::Metrics::Export::FAILURE)
_(log_stream.string).must_match(/calling MetricProvider#shutdown multiple times/)
end
end

it 'returns a no-op meter after being shutdown' do
with_test_logger do |log_stream|
OpenTelemetry.meter_provider.shutdown

_(OpenTelemetry.meter_provider.meter('test')).must_be_instance_of(OpenTelemetry::Metrics::Meter)
_(log_stream.string).must_match(/calling MeterProvider#meter after shutdown, a noop meter will be returned/)
end
end

it 'returns a timeout response when it times out' do
mock_metric_reader = Minitest::Mock.new
mock_metric_reader.expect(:nothing_gets_called_because_it_times_out_first, nil)
OpenTelemetry.meter_provider.add_metric_reader(mock_metric_reader)

_(OpenTelemetry.meter_provider.shutdown(timeout: 0)).must_equal(OpenTelemetry::SDK::Metrics::Export::TIMEOUT)
end

it 'invokes shutdown on all registered Metric Readers' do
mock_metric_reader_1 = Minitest::Mock.new
mock_metric_reader_2 = Minitest::Mock.new
mock_metric_reader_1.expect(:shutdown, nil, [{ timeout: nil }])
mock_metric_reader_2.expect(:shutdown, nil, [{ timeout: nil }])

OpenTelemetry.meter_provider.add_metric_reader(mock_metric_reader_1)
OpenTelemetry.meter_provider.add_metric_reader(mock_metric_reader_2)
OpenTelemetry.meter_provider.shutdown

mock_metric_reader_1.verify
mock_metric_reader_2.verify
end
end

describe '#force_flush' do
it 'returns a timeout response when it times out' do
mock_metric_reader = Minitest::Mock.new
mock_metric_reader.expect(:nothing_gets_called_because_it_times_out_first, nil)
OpenTelemetry.meter_provider.add_metric_reader(mock_metric_reader)

_(OpenTelemetry.meter_provider.force_flush(timeout: 0)).must_equal(OpenTelemetry::SDK::Metrics::Export::TIMEOUT)
end

it 'invokes force_flush on all registered Metric Readers' do
mock_metric_reader_1 = Minitest::Mock.new
mock_metric_reader_2 = Minitest::Mock.new
mock_metric_reader_1.expect(:force_flush, nil, [{ timeout: nil }])
mock_metric_reader_2.expect(:force_flush, nil, [{ timeout: nil }])
OpenTelemetry.meter_provider.add_metric_reader(mock_metric_reader_1)
OpenTelemetry.meter_provider.add_metric_reader(mock_metric_reader_2)

OpenTelemetry.meter_provider.force_flush

mock_metric_reader_1.verify
mock_metric_reader_2.verify
end
end

describe '#add_metric_reader' do
it 'adds a metric reader' do
metric_reader = OpenTelemetry::SDK::Metrics::Export::MetricReader.new(
OpenTelemetry::SDK::Metrics::Export::ConsoleExporter.new
)

OpenTelemetry.meter_provider.add_metric_reader(metric_reader)

_(OpenTelemetry.meter_provider.instance_variable_get(:@metric_readers)).must_equal([metric_reader])
end
end

describe '#add_view' do
# TODO
end
end
9 changes: 9 additions & 0 deletions metrics_sdk/test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,12 @@ def reset_metrics_sdk
OpenTelemetry.logger = Logger.new(File::NULL)
OpenTelemetry.error_handler = nil
end

def with_test_logger
log_stream = StringIO.new
original_logger = OpenTelemetry.logger
OpenTelemetry.logger = ::Logger.new(log_stream)
yield log_stream
ensure
OpenTelemetry.logger = original_logger
end

0 comments on commit f64082e

Please sign in to comment.