Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDTEST-409] Send telemetry events in batches #3749

Merged
merged 8 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 73 additions & 26 deletions lib/datadog/core/telemetry/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@
module Datadog
module Core
module Telemetry
# Collection of telemetry events
class Event
extend Core::Utils::Forking

# returns sequence that increments every time the configuration changes
def self.configuration_sequence
after_fork! { @sequence = Datadog::Core::Utils::Sequence.new(1) }
@sequence ||= Datadog::Core::Utils::Sequence.new(1)
end

# Base class for all Telemetry V2 events.
class Base
# The type of the event.
Expand All @@ -12,8 +21,7 @@ class Base
def type; end

# The JSON payload for the event.
# @param seq_id [Integer] The sequence ID for the event.
def payload(seq_id)
def payload
{}
end
end
Expand All @@ -24,8 +32,7 @@ def type
'app-started'
end

def payload(seq_id)
@seq_id = seq_id
def payload
{
products: products,
configuration: configuration,
Expand Down Expand Up @@ -80,16 +87,19 @@ def products
].freeze

# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/MethodLength
def configuration
config = Datadog.configuration
seq_id = Event.configuration_sequence.next

list = [
conf_value('DD_AGENT_HOST', config.agent.host),
conf_value('DD_AGENT_TRANSPORT', agent_transport(config)),
conf_value('DD_TRACE_SAMPLE_RATE', to_value(config.tracing.sampling.default_rate)),
conf_value('DD_AGENT_HOST', config.agent.host, seq_id),
conf_value('DD_AGENT_TRANSPORT', agent_transport(config), seq_id),
conf_value('DD_TRACE_SAMPLE_RATE', to_value(config.tracing.sampling.default_rate), seq_id),
conf_value(
'DD_TRACE_REMOVE_INTEGRATION_SERVICE_NAMES_ENABLED',
config.tracing.contrib.global_default_service_name.enabled
config.tracing.contrib.global_default_service_name.enabled,
seq_id
),
]

Expand All @@ -98,32 +108,45 @@ def configuration
peer_service_mapping = config.tracing.contrib.peer_service_mapping
peer_service_mapping_str = peer_service_mapping.map { |key, value| "#{key}:#{value}" }.join(',')
end
list << conf_value('DD_TRACE_PEER_SERVICE_MAPPING', peer_service_mapping_str)
list << conf_value('DD_TRACE_PEER_SERVICE_MAPPING', peer_service_mapping_str, seq_id)

# Whitelist of configuration options to send in additional payload object
TARGET_OPTIONS.each do |option|
split_option = option.split('.')
list << conf_value(option, to_value(config.dig(*split_option)))
list << conf_value(option, to_value(config.dig(*split_option)), seq_id)
end

# Add some more custom additional payload values here
list.push(
conf_value('tracing.auto_instrument.enabled', !defined?(Datadog::AutoInstrument::LOADED).nil?),
conf_value('tracing.writer_options.buffer_size', to_value(config.tracing.writer_options[:buffer_size])),
conf_value('tracing.writer_options.flush_interval', to_value(config.tracing.writer_options[:flush_interval])),
conf_value('tracing.opentelemetry.enabled', !defined?(Datadog::OpenTelemetry::LOADED).nil?),
conf_value('tracing.auto_instrument.enabled', !defined?(Datadog::AutoInstrument::LOADED).nil?, seq_id),
conf_value(
'tracing.writer_options.buffer_size',
to_value(config.tracing.writer_options[:buffer_size]),
seq_id
),
conf_value(
'tracing.writer_options.flush_interval',
to_value(config.tracing.writer_options[:flush_interval]),
seq_id
),
conf_value(
'tracing.opentelemetry.enabled',
!defined?(Datadog::OpenTelemetry::LOADED).nil?,
seq_id
),
)
list << conf_value('logger.instance', config.logger.instance.class.to_s) if config.logger.instance
list << conf_value('logger.instance', config.logger.instance.class.to_s, seq_id) if config.logger.instance
if config.respond_to?('appsec')
list << conf_value('appsec.enabled', config.dig('appsec', 'enabled'))
list << conf_value('appsec.sca_enabled', config.dig('appsec', 'sca_enabled'))
list << conf_value('appsec.enabled', config.dig('appsec', 'enabled'), seq_id)
list << conf_value('appsec.sca_enabled', config.dig('appsec', 'sca_enabled'), seq_id)
end
list << conf_value('ci.enabled', config.dig('ci', 'enabled')) if config.respond_to?('ci')
list << conf_value('ci.enabled', config.dig('ci', 'enabled'), seq_id) if config.respond_to?('ci')

list.reject! { |entry| entry[:value].nil? }
list
end
# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/MethodLength

def agent_transport(config)
adapter = Core::Configuration::AgentSettingsResolver.call(config).adapter
Expand All @@ -134,12 +157,12 @@ def agent_transport(config)
end
end

def conf_value(name, value, origin = 'code')
def conf_value(name, value, seq_id, origin = 'code')
{
name: name,
value: value,
origin: origin,
seq_id: @seq_id,
seq_id: seq_id,
}
end

Expand Down Expand Up @@ -169,7 +192,7 @@ def type
'app-dependencies-loaded'
end

def payload(seq_id)
def payload
{ dependencies: dependencies }
end

Expand All @@ -192,7 +215,7 @@ def type
'app-integrations-change'
end

def payload(seq_id)
def payload
{ integrations: integrations }
end

Expand Down Expand Up @@ -245,18 +268,20 @@ def initialize(changes, origin)
@origin = origin
end

def payload(seq_id)
{ configuration: configuration(seq_id) }
def payload
{ configuration: configuration }
end

def configuration(seq_id)
def configuration
config = Datadog.configuration
seq_id = Event.configuration_sequence.next

res = @changes.map do |name, value|
{
name: name,
value: value,
origin: @origin,
seq_id: seq_id,
}
end

Expand Down Expand Up @@ -299,7 +324,7 @@ def initialize(namespace, metric_series)
@metric_series = metric_series
end

def payload(_)
def payload
{
namespace: @namespace,
series: @metric_series.map(&:to_h)
Expand All @@ -313,6 +338,28 @@ def type
'distributions'
end
end

# Telemetry class for the 'message-batch' event
class MessageBatch
attr_reader :events

def type
'message-batch'
end

def initialize(events)
@events = events
end

def payload
@events.map do |event|
{
request_type: event.type,
payload: event.payload,
}
end
end
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/core/telemetry/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def build_payload(event, seq_id)
application: application,
debug: false,
host: host,
payload: event.payload(seq_id),
payload: event.payload,
request_type: event.type,
runtime_id: Core::Environment::Identity.id,
seq_id: seq_id,
Expand Down
6 changes: 2 additions & 4 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,11 @@ def perform(*events)
end

def flush_events(events)
return if events.nil?
return if events.nil? || events.empty?
return if !enabled? || !sent_started_event?

Datadog.logger.debug { "Sending #{events&.count} telemetry events" }
events.each do |event|
send_event(event)
end
send_event(Event::MessageBatch.new(events))
end

def heartbeat!
Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/core/telemetry/emitter.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module Datadog
module Core
module Telemetry
class Emitter
@sequence: Datadog::Core::Utils::Sequence
self.@sequence: Datadog::Core::Utils::Sequence

attr_reader http_transport: untyped

Expand Down
21 changes: 16 additions & 5 deletions sig/datadog/core/telemetry/event.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ module Datadog
module Core
module Telemetry
class Event
extend Core::Utils::Forking

self.@sequence: Datadog::Core::Utils::Sequence

def self.configuration_sequence: () -> Datadog::Core::Utils::Sequence

class Base
def payload: (int seq_id) -> Hash[Symbol, untyped]
def payload: () -> (Hash[Symbol, untyped] | Array[Hash[Symbol, untyped]])
def type: -> String?
end

class AppStarted < Base
TARGET_OPTIONS: Array[String]

@seq_id: int

private

def products: -> Hash[Symbol, untyped]
Expand All @@ -20,7 +24,7 @@ module Datadog

def agent_transport: (untyped config) -> String

def conf_value: (String name, Object value, ?String origin) -> Hash[Symbol, untyped]
def conf_value: (String name, Object value, Integer seq_id, ?String origin) -> Hash[Symbol, untyped]

def to_value: (Object value) -> Object

Expand All @@ -47,7 +51,7 @@ module Datadog

def initialize: (Enumerable[[String, Numeric | bool | String]] changes, String origin) -> void

def configuration: (int seq_id) -> Array[Hash[Symbol, untyped]]
def configuration: () -> Array[Hash[Symbol, untyped]]
end

class AppHeartbeat < Base
Expand All @@ -65,6 +69,13 @@ module Datadog

class Distributions < GenerateMetrics
end

class MessageBatch < Base
attr_reader events: Array[Datadog::Core::Telemetry::Event::Base]
@events: Array[Datadog::Core::Telemetry::Event::Base]

def initialize: (Array[Datadog::Core::Telemetry::Event::Base] events) -> void
end
end
end
end
Expand Down
32 changes: 30 additions & 2 deletions spec/datadog/core/telemetry/event_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

RSpec.describe Datadog::Core::Telemetry::Event do
let(:id) { double('seq_id') }
subject(:payload) { event.payload(id) }
subject(:payload) { event.payload }

context 'AppStarted' do
let(:event) { described_class::AppStarted.new }
Expand All @@ -14,6 +14,8 @@
end

before do
allow_any_instance_of(Datadog::Core::Utils::Sequence).to receive(:next).and_return(id)

Datadog.configure do |c|
c.agent.host = '1.2.3.4'
c.tracing.sampling.default_rate = 0.5
Expand Down Expand Up @@ -164,12 +166,17 @@ def contain_configuration(*array)
let(:name) { 'key' }
let(:value) { 'value' }

before do
allow_any_instance_of(Datadog::Core::Utils::Sequence).to receive(:next).and_return(id)
end

it 'has a list of client configurations' do
is_expected.to eq(
configuration: [{
name: name,
value: value,
origin: origin,
seq_id: id
}]
)
end
Expand All @@ -185,7 +192,7 @@ def contain_configuration(*array)
is_expected.to eq(
configuration:
[
{ name: name, value: value, origin: origin },
{ name: name, value: value, origin: origin, seq_id: id },
{ name: 'appsec.sca_enabled', value: false, origin: 'code', seq_id: id }
]
)
Expand Down Expand Up @@ -252,4 +259,25 @@ def contain_configuration(*array)
)
end
end

context 'MessageBatch' do
let(:event) { described_class::MessageBatch.new(events) }

let(:events) { [described_class::AppClosing.new, described_class::AppHeartbeat.new] }

it do
is_expected.to eq(
[
{
request_type: 'app-closing',
payload: {}
},
{
request_type: 'app-heartbeat',
payload: {}
}
]
)
end
end
end
Loading
Loading