Skip to content

Commit

Permalink
feat: All AWS services emit traces (#1150)
Browse files Browse the repository at this point in the history
* feat: All V3 services emit traces

* refactor: Clean up Handler and MessageHelper

* Revise based on feedback

* Fix CI

* Correctly handle V2 service clients

* Add comments

* Fix loaded_service check

* Fix loaded_service check to be compatible with V3
  • Loading branch information
jterapin committed Sep 18, 2024
1 parent a174d03 commit 7671f4e
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,32 @@ module Instrumentation
module AwsSdk
# Generates Spans for all interactions with AwsSdk
class Handler < Seahorse::Client::Handler
SQS_SEND_MESSAGE = 'SQS.SendMessage'
SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch'
SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage'
SNS_PUBLISH = 'SNS.Publish'

def call(context)
return super unless context

service_name = service_name(context)
service_id = service_name(context)
operation = context.operation&.name
client_method = "#{service_name}.#{operation}"
attributes = {
'aws.region' => context.config.region,
OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api',
OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation,
OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_name
}
attributes[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_name == 'DynamoDB'
MessagingHelper.apply_sqs_attributes(attributes, context, client_method) if service_name == 'SQS'
MessagingHelper.apply_sns_attributes(attributes, context, client_method) if service_name == 'SNS'
client_method = "#{service_id}.#{operation}"

tracer.in_span(
span_name(context, client_method, service_id),
attributes: attributes(context, client_method, service_id, operation),
kind: span_kind(client_method, service_id)
) do |span|
if instrumentation_config[:inject_messaging_context] &&
%w[SQS SNS].include?(service_id)
MessagingHelper.inject_context(context, client_method)
end

tracer.in_span(span_name(context, client_method), attributes: attributes, kind: span_kind(client_method)) do |span|
inject_context(context, client_method)
if instrumentation_config[:suppress_internal_instrumentation]
OpenTelemetry::Common::Utilities.untraced { super }
else
super
end.tap do |response|
span.set_attribute(OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE,
context.http_response.status_code)
span.set_attribute(
OpenTelemetry::SemanticConventions::Trace::HTTP_STATUS_CODE,
context.http_response.status_code
)

if (err = response.error)
span.record_exception(err)
Expand All @@ -65,48 +61,40 @@ def service_name(context)
context.client.class.api.metadata['serviceId'] || context.client.class.to_s.split('::')[1]
end

SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze
def inject_context(context, client_method)
return unless SEND_MESSAGE_CLIENT_METHODS.include? client_method
return unless instrumentation_config[:inject_messaging_context]

if client_method == SQS_SEND_MESSAGE_BATCH
context.params[:entries].each do |entry|
entry[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter)
end
def span_kind(client_method, service_id)
case service_id
when 'SQS', 'SNS'
MessagingHelper.span_kind(client_method)
else
context.params[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter)
OpenTelemetry::Trace::SpanKind::CLIENT
end
end

def span_kind(client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
OpenTelemetry::Trace::SpanKind::PRODUCER
when SQS_RECEIVE_MESSAGE
OpenTelemetry::Trace::SpanKind::CONSUMER
def span_name(context, client_method, service_id)
case service_id
when 'SQS', 'SNS'
MessagingHelper.legacy_span_name(context, client_method)
else
OpenTelemetry::Trace::SpanKind::CLIENT
client_method
end
end

def span_name(context, client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
"#{MessagingHelper.queue_name(context)} publish"
when SQS_RECEIVE_MESSAGE
"#{MessagingHelper.queue_name(context)} receive"
else
client_method
def attributes(context, client_method, service_id, operation)
{
'aws.region' => context.config.region,
OpenTelemetry::SemanticConventions::Trace::RPC_SYSTEM => 'aws-api',
OpenTelemetry::SemanticConventions::Trace::RPC_METHOD => operation,
OpenTelemetry::SemanticConventions::Trace::RPC_SERVICE => service_id
}.tap do |attrs|
attrs[SemanticConventions::Trace::DB_SYSTEM] = 'dynamodb' if service_id == 'DynamoDB'
MessagingHelper.apply_span_attributes(context, attrs, client_method, service_id) if %w[SQS SNS].include?(service_id)
end
end
end

# A Seahorse::Client::Plugin that enables instrumentation for all AWS services
class Plugin < Seahorse::Client::Plugin
def add_handlers(handlers, config)
def add_handlers(handlers, _config)
# run before Seahorse::Client::Plugin::ParamValidator (priority 50)
handlers.add Handler, step: :validate, priority: 49
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base

install do |_config|
require_dependencies
add_plugin(Seahorse::Client::Base, *loaded_constants)
add_plugins(Seahorse::Client::Base, *loaded_service_clients)
end

present do
Expand Down Expand Up @@ -41,31 +41,39 @@ def gem_version

def require_dependencies
require_relative 'handler'
require_relative 'services'
require_relative 'message_attributes'
require_relative 'messaging_helper'
end

def add_plugin(*targets)
def add_plugins(*targets)
targets.each { |klass| klass.add_plugin(AwsSdk::Plugin) }
end

def loaded_constants
# Cross-check services against loaded AWS constants
# Module#const_get can return a constant from ancestors when there's a miss.
# If this conincidentally matches another constant, it will attempt to patch
# the wrong constant, resulting in patch failure.
available_services = ::Aws.constants & SERVICES.map(&:to_sym)
available_services.each_with_object([]) do |service, constants|
next if ::Aws.autoload?(service)
def loaded_service_clients
::Aws.constants.each_with_object([]) do |c, constants|
m = ::Aws.const_get(c)
next unless loaded_service?(c, m)

begin
constants << ::Aws.const_get(service, false).const_get(:Client, false)
constants << m.const_get(:Client)
rescue StandardError => e
OpenTelemetry.logger.warn("Constant could not be loaded: #{e}")
end
end
end

# This check does the following:
# 1 - Checks if the service client is autoload or not
# 2 - Validates whether if is a service client
# note that Seahorse::Client::Base is a superclass for V3 clients
# but for V2, it is Aws::Client
def loaded_service?(constant, service_module)
!::Aws.autoload?(constant) &&
service_module.is_a?(Module) &&
service_module.const_defined?(:Client) &&
(service_module.const_get(:Client).superclass == Seahorse::Client::Base ||
service_module.const_get(:Client).superclass == Aws::Client)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
module OpenTelemetry
module Instrumentation
module AwsSdk
# MessagingHelper class provides methods for calculating messaging span attributes
# An utility class to help SQS/SNS-related span attributes/context injection
class MessagingHelper
class << self
SQS_SEND_MESSAGE = 'SQS.SendMessage'
SQS_SEND_MESSAGE_BATCH = 'SQS.SendMessageBatch'
SQS_RECEIVE_MESSAGE = 'SQS.ReceiveMessage'
SNS_PUBLISH = 'SNS.Publish'
SEND_MESSAGE_CLIENT_METHODS = [SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH].freeze

def queue_name(context)
topic_arn = context.params[:topic_arn]
target_arn = context.params[:target_arn]
Expand All @@ -28,19 +34,64 @@ def queue_name(context)
'unknown'
end

def legacy_span_name(context, client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
"#{MessagingHelper.queue_name(context)} publish"
when SQS_RECEIVE_MESSAGE
"#{MessagingHelper.queue_name(context)} receive"
else
client_method
end
end

def apply_span_attributes(context, attrs, client_method, service_id)
case service_id
when 'SQS'
apply_sqs_attributes(attrs, context, client_method)
when 'SNS'
apply_sns_attributes(attrs, context, client_method)
end
end

def span_kind(client_method)
case client_method
when SQS_SEND_MESSAGE, SQS_SEND_MESSAGE_BATCH, SNS_PUBLISH
OpenTelemetry::Trace::SpanKind::PRODUCER
when SQS_RECEIVE_MESSAGE
OpenTelemetry::Trace::SpanKind::CONSUMER
else
OpenTelemetry::Trace::SpanKind::CLIENT
end
end

def inject_context(context, client_method)
return unless SEND_MESSAGE_CLIENT_METHODS.include?(client_method)

if client_method == SQS_SEND_MESSAGE_BATCH
context.params[:entries].each do |entry|
entry[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(entry[:message_attributes], setter: MessageAttributeSetter)
end
else
context.params[:message_attributes] ||= {}
OpenTelemetry.propagation.inject(context.params[:message_attributes], setter: MessageAttributeSetter)
end
end

private

def apply_sqs_attributes(attributes, context, client_method)
attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sqs'
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'queue'
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context)
attributes[SemanticConventions::Trace::MESSAGING_URL] = context.params[:queue_url] if context.params[:queue_url]

attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == 'SQS.ReceiveMessage'
attributes[SemanticConventions::Trace::MESSAGING_OPERATION] = 'receive' if client_method == SQS_RECEIVE_MESSAGE
end

def apply_sns_attributes(attributes, context, client_method)
attributes[SemanticConventions::Trace::MESSAGING_SYSTEM] = 'aws.sns'

return unless client_method == 'SNS.Publish'
return unless client_method == SNS_PUBLISH

attributes[SemanticConventions::Trace::MESSAGING_DESTINATION_KIND] = 'topic'
attributes[SemanticConventions::Trace::MESSAGING_DESTINATION] = queue_name(context)
Expand Down

This file was deleted.

Loading

0 comments on commit 7671f4e

Please sign in to comment.