Skip to content

Commit

Permalink
v2: feat(contrib): migrate all pending contribs (#2829)
Browse files Browse the repository at this point in the history
  • Loading branch information
darccio committed Aug 27, 2024
1 parent f231f23 commit fcf8440
Show file tree
Hide file tree
Showing 63 changed files with 1,231 additions and 857 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ contrib_coverage*.txt
gotestsum-report*.xml
/internal/apps/unit-of-work/unit-of-work
tools/v2check/_stage/v2playground/*
static-analysis.datadog.yml
19 changes: 6 additions & 13 deletions contrib/IBM/sarama/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ package sarama
import (
"math"

"github.com/DataDog/dd-trace-go/v2/internal"
"github.com/DataDog/dd-trace-go/v2/internal/namingschema"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
)

const defaultServiceName = "kafka"

type config struct {
consumerServiceName string
producerServiceName string
Expand All @@ -23,18 +20,14 @@ type config struct {
}

func defaults(cfg *config) {
cfg.consumerServiceName = namingschema.ServiceName(defaultServiceName)
cfg.producerServiceName = namingschema.ServiceNameOverrideV0(defaultServiceName, defaultServiceName)
cfg.consumerServiceName = instr.ServiceName(instrumentation.ComponentConsumer, nil)
cfg.producerServiceName = instr.ServiceName(instrumentation.ComponentProducer, nil)

cfg.consumerSpanName = namingschema.OpName(namingschema.KafkaInbound)
cfg.producerSpanName = namingschema.OpName(namingschema.KafkaOutbound)
cfg.consumerSpanName = instr.OperationName(instrumentation.ComponentConsumer, nil)
cfg.producerSpanName = instr.OperationName(instrumentation.ComponentProducer, nil)

// cfg.analyticsRate = globalconfig.AnalyticsRate()
if internal.BoolEnv("DD_TRACE_SARAMA_ANALYTICS_ENABLED", false) {
cfg.analyticsRate = 1.0
} else {
cfg.analyticsRate = math.NaN()
}
cfg.analyticsRate = instr.AnalyticsRate(false)
}

// Option describes options for the Sarama integration.
Expand Down
20 changes: 9 additions & 11 deletions contrib/IBM/sarama/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ import (
"github.com/DataDog/dd-trace-go/v2/ddtrace"
"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/internal/log"
"github.com/DataDog/dd-trace-go/v2/internal/telemetry"
"github.com/DataDog/dd-trace-go/v2/instrumentation"

"github.com/IBM/sarama"
)

const componentName = "IBM/sarama"
var instr *instrumentation.Instrumentation

func init() {
telemetry.LoadIntegration(componentName)
tracer.MarkIntegrationImported("github.com/IBM/sarama")
instr = instrumentation.Load(instrumentation.PackageIBMSarama)
}

type partitionConsumer struct {
Expand All @@ -44,7 +42,7 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P
for _, opt := range opts {
opt.apply(cfg)
}
log.Debug("contrib/IBM/sarama: Wrapping Partition Consumer: %#v", cfg)
instr.Logger().Debug("contrib/IBM/sarama: Wrapping Partition Consumer: %#v", cfg)
wrapped := &partitionConsumer{
PartitionConsumer: pc,
messages: make(chan *sarama.ConsumerMessage),
Expand All @@ -60,7 +58,7 @@ func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.P
tracer.SpanType(ext.SpanTypeMessageConsumer),
tracer.Tag(ext.MessagingKafkaPartition, msg.Partition),
tracer.Tag("offset", msg.Offset),
tracer.Tag(ext.Component, componentName),
tracer.Tag(ext.Component, instrumentation.PackageIBMSarama),
tracer.Tag(ext.SpanKind, ext.SpanKindConsumer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka),
tracer.Measured(),
Expand Down Expand Up @@ -155,7 +153,7 @@ func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer,
for _, opt := range opts {
opt.apply(cfg)
}
log.Debug("contrib/IBM/sarama: Wrapping Sync Producer: %#v", cfg)
instr.Logger().Debug("contrib/IBM/sarama: Wrapping Sync Producer: %#v", cfg)
if saramaConfig == nil {
saramaConfig = sarama.NewConfig()
}
Expand Down Expand Up @@ -199,12 +197,12 @@ func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts
for _, opt := range opts {
opt.apply(cfg)
}
log.Debug("contrib/IBM/sarama: Wrapping Async Producer: %#v", cfg)
instr.Logger().Debug("contrib/IBM/sarama: Wrapping Async Producer: %#v", cfg)
if saramaConfig == nil {
saramaConfig = sarama.NewConfig()
saramaConfig.Version = sarama.V0_11_0_0
} else if !saramaConfig.Version.IsAtLeast(sarama.V0_11_0_0) {
log.Error("Tracing Sarama async producer requires at least sarama.V0_11_0_0 version")
instr.Logger().Error("Tracing Sarama async producer requires at least sarama.V0_11_0_0 version")
}
wrapped := &asyncProducer{
AsyncProducer: p,
Expand Down Expand Up @@ -269,7 +267,7 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro
tracer.ServiceName(cfg.producerServiceName),
tracer.ResourceName("Produce Topic " + msg.Topic),
tracer.SpanType(ext.SpanTypeMessageProducer),
tracer.Tag(ext.Component, componentName),
tracer.Tag(ext.Component, instrumentation.PackageIBMSarama),
tracer.Tag(ext.SpanKind, ext.SpanKindProducer),
tracer.Tag(ext.MessagingSystem, ext.MessagingSystemKafka),
}
Expand Down
69 changes: 0 additions & 69 deletions contrib/IBM/sarama/sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,76 +13,11 @@ import (
"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/ddtrace/mocktracer"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/internal/contrib/namingschematest"

"github.com/IBM/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func genTestSpans(t *testing.T, serviceOverride string) []*mocktracer.Span {
var opts []Option
if serviceOverride != "" {
opts = append(opts, WithService(serviceOverride))
}
mt := mocktracer.Start()
defer mt.Stop()

broker := sarama.NewMockBroker(t, 1)
defer broker.Close()

broker.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker.Addr(), broker.BrokerID()).
SetLeader("test-topic", 0, broker.BrokerID()),
"OffsetRequest": sarama.NewMockOffsetResponse(t).
SetOffset("test-topic", 0, sarama.OffsetOldest, 0).
SetOffset("test-topic", 0, sarama.OffsetNewest, 1),
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
SetMessage("test-topic", 0, 0, sarama.StringEncoder("hello")),
"ProduceRequest": sarama.NewMockProduceResponse(t).
SetError("test-topic", 0, sarama.ErrNoError),
})
cfg := sarama.NewConfig()
cfg.Version = sarama.MinVersion
cfg.Producer.Return.Successes = true
cfg.Producer.Flush.Messages = 1

producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, cfg)
require.NoError(t, err)
producer = WrapSyncProducer(cfg, producer, opts...)

c, err := sarama.NewConsumer([]string{broker.Addr()}, cfg)
require.NoError(t, err)
defer func(c sarama.Consumer) {
err := c.Close()
require.NoError(t, err)
}(c)
c = WrapConsumer(c, opts...)

msg1 := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("test 1"),
Metadata: "test",
}
_, _, err = producer.SendMessage(msg1)
require.NoError(t, err)

pc, err := c.ConsumePartition("test-topic", 0, 0)
if err != nil {
t.Fatal(err)
}
_ = <-pc.Messages()
err = pc.Close()
require.NoError(t, err)
// wait for the channel to be closed
<-pc.Messages()

spans := mt.FinishedSpans()
require.Len(t, spans, 2)
return spans
}

func TestConsumer(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()
Expand Down Expand Up @@ -356,10 +291,6 @@ func TestAsyncProducer(t *testing.T) {
})
}

func TestNamingSchema(t *testing.T) {
namingschematest.NewKafkaTest(genTestSpans)(t)
}

func newMockBroker(t *testing.T) *sarama.MockBroker {
broker := sarama.NewMockBroker(t, 1)

Expand Down
14 changes: 6 additions & 8 deletions contrib/gorilla/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@ import (
httptrace "github.com/DataDog/dd-trace-go/contrib/net/http/v2"
"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
httptraceinternal "github.com/DataDog/dd-trace-go/v2/instrumentation/httptrace"
"github.com/DataDog/dd-trace-go/v2/instrumentation/options"
httptraceinternal "github.com/DataDog/dd-trace-go/v2/internal/contrib/httptrace"
"github.com/DataDog/dd-trace-go/v2/internal/log"
"github.com/DataDog/dd-trace-go/v2/internal/telemetry"

"github.com/gorilla/mux"
)

const componentName = "gorilla/mux"
var instr *instrumentation.Instrumentation

func init() {
telemetry.LoadIntegration(componentName)
tracer.MarkIntegrationImported("github.com/gorilla/mux")
instr = instrumentation.Load(instrumentation.PackageGorillaMux)
}

// Router registers routes to be matched and dispatches a handler.
Expand Down Expand Up @@ -124,9 +122,9 @@ func (r *Router) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// requests and responses served by the router.
func WrapRouter(router *mux.Router, opts ...RouterOption) *Router {
cfg := newConfig(opts)
cfg.spanOpts = append(cfg.spanOpts, tracer.Tag(ext.Component, componentName))
cfg.spanOpts = append(cfg.spanOpts, tracer.Tag(ext.Component, instrumentation.PackageGorillaMux))
cfg.spanOpts = append(cfg.spanOpts, tracer.Tag(ext.SpanKind, ext.SpanKindServer))
log.Debug("contrib/gorilla/mux: Configuring Router: %#v", cfg)
instr.Logger().Debug("contrib/gorilla/mux: Configuring Router: %#v", cfg)
return &Router{
Router: router,
config: cfg,
Expand Down
73 changes: 20 additions & 53 deletions contrib/gorilla/mux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import (
"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/ddtrace/mocktracer"
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
"github.com/DataDog/dd-trace-go/v2/internal/appsec"
"github.com/DataDog/dd-trace-go/v2/internal/contrib/namingschematest"
"github.com/DataDog/dd-trace-go/v2/internal/globalconfig"
"github.com/DataDog/dd-trace-go/v2/internal/normalizer"
"github.com/DataDog/dd-trace-go/v2/instrumentation"
"github.com/DataDog/dd-trace-go/v2/instrumentation/testutils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -155,64 +153,58 @@ func TestWithHeaderTags(t *testing.T) {
assert := assert.New(t)
assert.Equal(len(spans), 1)
s := spans[0]
for _, arg := range htArgs {
_, tag := normalizer.HeaderTag(arg)
instrumentation.NewHeaderTags(htArgs).Iter(func(_ string, tag string) {
assert.NotContains(s.Tags(), tag)
}
})
})
t.Run("integration", func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

htArgs := []string{"h!e@a-d.e*r", "2header:tag"}
r := setupReq(WithHeaderTags(htArgs))
_ = setupReq(WithHeaderTags(htArgs))
spans := mt.FinishedSpans()
assert := assert.New(t)
assert.Equal(len(spans), 1)
s := spans[0]

for _, arg := range htArgs {
header, tag := normalizer.HeaderTag(arg)
assert.Equal(strings.Join(r.Header.Values(header), ","), s.Tags()[tag])
}
assert.Equal("val,val2", s.Tags()["http.request.headers.h_e_a-d_e_r"])
assert.Equal("2val", s.Tags()["tag"])
assert.NotContains(s.Tags(), "http.headers.x-datadog-header")
})
t.Run("global", func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

header, tag := normalizer.HeaderTag("3header")
globalconfig.SetHeaderTag(header, tag)
testutils.SetGlobalHeaderTags(t, "3header")

r := setupReq()
_ = setupReq()
spans := mt.FinishedSpans()
assert := assert.New(t)
assert.Equal(len(spans), 1)
s := spans[0]

assert.Equal(strings.Join(r.Header.Values(header), ","), s.Tags()[tag])
assert.Equal("3val", s.Tags()["http.request.headers.3header"])
assert.NotContains(s.Tags(), "http.request.headers.other")
assert.NotContains(s.Tags(), "http.headers.x-datadog-header")
})
t.Run("override", func(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

globalH, globalT := normalizer.HeaderTag("3header")
globalconfig.SetHeaderTag(globalH, globalT)
testutils.SetGlobalHeaderTags(t, "3header")

htArgs := []string{"h!e@a-d.e*r", "2header:tag"}
r := setupReq(WithHeaderTags(htArgs))
_ = setupReq(WithHeaderTags(htArgs))
spans := mt.FinishedSpans()
assert := assert.New(t)
assert.Equal(len(spans), 1)
s := spans[0]

for _, arg := range htArgs {
header, tag := normalizer.HeaderTag(arg)
assert.Equal(strings.Join(r.Header.Values(header), ","), s.Tags()[tag])
}
assert.Equal("val,val2", s.Tags()["http.request.headers.h_e_a-d_e_r"])
assert.Equal("2val", s.Tags()["tag"])
assert.NotContains(s.Tags(), "http.headers.x-datadog-header")
assert.NotContains(s.Tags(), globalT)
assert.NotContains(s.Tags(), "http.request.headers.3header")
})
}

Expand Down Expand Up @@ -296,9 +288,7 @@ func TestAnalyticsSettings(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

rate := globalconfig.AnalyticsRate()
defer globalconfig.SetAnalyticsRate(rate)
globalconfig.SetAnalyticsRate(0.4)
testutils.SetGlobalAnalyticsRate(t, 0.4)

assertRate(t, mt, 0.4)
})
Expand All @@ -321,9 +311,7 @@ func TestAnalyticsSettings(t *testing.T) {
mt := mocktracer.Start()
defer mt.Stop()

rate := globalconfig.AnalyticsRate()
defer globalconfig.SetAnalyticsRate(rate)
globalconfig.SetAnalyticsRate(0.4)
testutils.SetGlobalAnalyticsRate(t, 0.4)

assertRate(t, mt, 0.23, WithAnalyticsRate(0.23))
})
Expand Down Expand Up @@ -407,10 +395,9 @@ func okHandler() http.Handler {
}

func TestAppSec(t *testing.T) {
appsec.Start()
defer appsec.Stop()
testutils.StartAppSec(t)

if !appsec.Enabled() {
if !instr.AppSecEnabled() {
t.Skip("appsec disabled")
}

Expand Down Expand Up @@ -535,23 +522,3 @@ func TestAppSec(t *testing.T) {
require.True(t, strings.Contains(event.(string), "crs-933-130"))
})
}

func TestNamingSchema(t *testing.T) {
genSpans := namingschematest.GenSpansFn(func(t *testing.T, serviceOverride string) []*mocktracer.Span {
var opts []RouterOption
if serviceOverride != "" {
opts = append(opts, WithService(serviceOverride))
}
mt := mocktracer.Start()
defer mt.Stop()

mux := NewRouter(opts...)
mux.Handle("/200", okHandler())
req := httptest.NewRequest("GET", "/200", nil)
w := httptest.NewRecorder()
mux.ServeHTTP(w, req)

return mt.FinishedSpans()
})
namingschematest.NewHTTPServerTest(genSpans, "mux.router")(t)
}
Loading

0 comments on commit fcf8440

Please sign in to comment.