Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDTEST-409] Add metrics management capabilities #3742

Merged
merged 12 commits into from
Jul 4, 2024
Merged
74 changes: 46 additions & 28 deletions lib/datadog/core/telemetry/metric.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,40 @@ module Core
module Telemetry
# Telemetry metrics data model (internal Datadog metrics for client libraries)
module Metric
def self.metric_id(type, name, tags = [])
"#{type}::#{name}::#{tags.join(',')}"
end

# Base class for all metric types
class Base
attr_reader :name, :tags, :values, :common, :interval
attr_reader :name, :tags, :values, :common

# @param name [String] metric name
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash of array of "tag:val" strings
# @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric
# @param interval [Integer] metrics aggregation interval in seconds
def initialize(name, tags: {}, common: true, interval: nil)
def initialize(name, tags: {}, common: true)
@name = name
@values = []
@tags = tags_to_array(tags)
@common = common
@interval = interval
end

def track(value); end
def id
@id ||= "#{type}::#{name}::#{tags.join(',')}"
end

def type; end
def track(value)
raise NotImplementedError, 'method must be implemented in subclasses'
end

def type
raise NotImplementedError, 'method must be implemented in subclasses'
end

def to_h
# @type var res: Hash[Symbol, untyped]
res = {
{
metric: name,
points: values,
type: type,
tags: tags,
common: common
}
res[:interval] = interval if interval
res
end

private
Expand All @@ -51,6 +50,29 @@ def tags_to_array(tags)
end
end

# Base class for metrics that require aggregation interval
class IntervalMetric < Base
attr_reader :interval

# @param name [String] metric name
# @param tags [Array<String>|Hash{String=>String}] metric tags as hash of array of "tag:val" strings
# @param common [Boolean] true if the metric is common for all languages, false for Ruby-specific metric
# @param interval [Integer] metrics aggregation interval in seconds
def initialize(name, interval:, tags: {}, common: true)
raise ArgumentError, 'interval must be a positive number' if interval.nil? || interval <= 0

super(name, tags: tags, common: common)

@interval = interval
end

def to_h
res = super
res[:interval] = interval
res
end
end

# Count metric adds up all the submitted values in a time interval. This would be suitable for a
# metric tracking the number of website hits, for instance.
class Count < Base
Expand All @@ -60,28 +82,23 @@ def type
TYPE
end

def inc(value = 1)
track(value)
end

def dec(value = 1)
track(-value)
end

def track(value)
value = value.to_i

if values.empty?
values << [Time.now.to_i, value]
else
values[0][0] = Time.now.to_i
values[0][1] += value
end
nil
end
end

# A gauge type takes the last value reported during the interval. This type would make sense for tracking RAM or
# CPU usage, where taking the last value provides a representative picture of the host’s behavior during the time
# interval.
class Gauge < Base
class Gauge < IntervalMetric
TYPE = 'gauge'

def type
Expand All @@ -95,15 +112,16 @@ def track(value)
values[0][0] = Time.now.to_i
values[0][1] = value
end
nil
end
end

# The rate type takes the count and divides it by the length of the time interval. This is useful if you’re
# interested in the number of hits per second.
class Rate < Base
class Rate < IntervalMetric
TYPE = 'rate'

def initialize(name, tags: {}, common: true, interval: nil)
def initialize(name, interval:, tags: {}, common: true)
super

@value = 0.0
Expand All @@ -115,9 +133,8 @@ def type

def track(value = 1.0)
@value += value

rate = interval ? @value / interval : 0.0
@values = [[Time.now.to_i, rate]]
@values = [[Time.now.to_i, @value / interval]]
nil
end
end

Expand All @@ -131,6 +148,7 @@ def type

def track(value)
values << value
nil
end

# distribution metric data does not have type field
Expand Down
79 changes: 79 additions & 0 deletions lib/datadog/core/telemetry/metrics_collection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# frozen_string_literal: true

require_relative 'event'
require_relative 'metric'

module Datadog
module Core
module Telemetry
# MetricsCollection is a thread-safe collection of metrics per namespace
class MetricsCollection
attr_reader :namespace, :interval

def initialize(namespace, aggregation_interval:)
@namespace = namespace
@interval = aggregation_interval

@mutex = Mutex.new

@metrics = {}
@distributions = {}
end

def inc(metric_name, value, tags: {}, common: true)
metric = Metric::Count.new(metric_name, tags: tags, common: common)
fetch_or_add_metric(metric, value)
end

def dec(metric_name, value, tags: {}, common: true)
metric = Metric::Count.new(metric_name, tags: tags, common: common)
fetch_or_add_metric(metric, -value)
end

def gauge(metric_name, value, tags: {}, common: true)
metric = Metric::Gauge.new(metric_name, tags: tags, common: common, interval: @interval)
fetch_or_add_metric(metric, value)
end

def rate(metric_name, value, tags: {}, common: true)
metric = Metric::Rate.new(metric_name, tags: tags, common: common, interval: @interval)
fetch_or_add_metric(metric, value)
end

def distribution(metric_name, value, tags: {}, common: true)
metric = Metric::Distribution.new(metric_name, tags: tags, common: common)
fetch_or_add_distribution(metric, value)
end

def flush!(queue)
@mutex.synchronize do
queue.enqueue(Event::GenerateMetrics.new(@namespace, @metrics.values)) if @metrics.any?
queue.enqueue(Event::Distributions.new(@namespace, @distributions.values)) if @distributions.any?

@metrics = {}
@distributions = {}
end
nil
end

private

def fetch_or_add_metric(metric, value)
@mutex.synchronize do
m = (@metrics[metric.id] ||= metric)
m.track(value)
end
nil
end

def fetch_or_add_distribution(metric, value)
@mutex.synchronize do
m = (@distributions[metric.id] ||= metric)
m.track(value)
end
nil
end
end
end
end
end
83 changes: 83 additions & 0 deletions lib/datadog/core/telemetry/metrics_manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# frozen_string_literal: true

require_relative 'metrics_collection'

module Datadog
module Core
module Telemetry
# MetricsManager aggregates and flushes metrics and distributions
class MetricsManager
attr_reader :enabled

def initialize(aggregation_interval:, enabled:)
@interval = aggregation_interval
@enabled = enabled
@mutex = Mutex.new

@collections = {}
end
anmarchenko marked this conversation as resolved.
Show resolved Hide resolved

def inc(namespace, metric_name, value, tags: {}, common: true)
return unless @enabled

# collection is thread-safe internally
collection = fetch_or_create_collection(namespace)
collection.inc(metric_name, value, tags: tags, common: common)
anmarchenko marked this conversation as resolved.
Show resolved Hide resolved
end

def dec(namespace, metric_name, value, tags: {}, common: true)
return unless @enabled

# collection is thread-safe internally
collection = fetch_or_create_collection(namespace)
collection.dec(metric_name, value, tags: tags, common: common)
end

def gauge(namespace, metric_name, value, tags: {}, common: true)
return unless @enabled

# collection is thread-safe internally
collection = fetch_or_create_collection(namespace)
collection.gauge(metric_name, value, tags: tags, common: common)
end

def rate(namespace, metric_name, value, tags: {}, common: true)
return unless @enabled

# collection is thread-safe internally
collection = fetch_or_create_collection(namespace)
collection.rate(metric_name, value, tags: tags, common: common)
end

def distribution(namespace, metric_name, value, tags: {}, common: true)
return unless @enabled

# collection is thread-safe internally
collection = fetch_or_create_collection(namespace)
collection.distribution(metric_name, value, tags: tags, common: common)
end

def flush!(queue)
return unless @enabled

collections = @mutex.synchronize { @collections.values }
collections.each { |col| col.flush!(queue) }

nil
end

def disable!
@enabled = false
end

private

def fetch_or_create_collection(namespace)
@mutex.synchronize do
@collections[namespace] ||= MetricsCollection.new(namespace, aggregation_interval: @interval)
end
end
end
end
end
end
Loading
Loading