diff --git a/docs/docs/sdk/otelcourier.md b/docs/docs/sdk/otelcourier.md index f84e9ec..09217d1 100755 --- a/docs/docs/sdk/otelcourier.md +++ b/docs/docs/sdk/otelcourier.md @@ -11,20 +11,23 @@ Package otelcourier instruments the github.com/gojek/courier\-go package. ## Index - [Constants](#constants) -- [func DisableCallbackTracing\(opts \*traceOptions\)](#DisableCallbackTracing) -- [func DisablePublisherTracing\(opts \*traceOptions\)](#DisablePublisherTracing) -- [func DisableSubscriberTracing\(opts \*traceOptions\)](#DisableSubscriberTracing) -- [func DisableUnsubscriberTracing\(opts \*traceOptions\)](#DisableUnsubscriberTracing) +- [Variables](#variables) +- [func DefaultTopicAttributeTransformer\(\_ context.Context, topic string\) string](#DefaultTopicAttributeTransformer) +- [type BucketBoundaries](#BucketBoundaries) +- [type OTel](#OTel) + - [func New\(service string, opts ...Option\) \*OTel](#New) + - [func \(t \*OTel\) ApplyMiddlewares\(c UseMiddleware\)](#OTel.ApplyMiddlewares) + - [func \(t \*OTel\) PublisherMiddleware\(next courier.Publisher\) courier.Publisher](#OTel.PublisherMiddleware) + - [func \(t \*OTel\) SubscriberMiddleware\(next courier.Subscriber\) courier.Subscriber](#OTel.SubscriberMiddleware) + - [func \(t \*OTel\) UnsubscriberMiddleware\(next courier.Unsubscriber\) courier.Unsubscriber](#OTel.UnsubscriberMiddleware) - [type Option](#Option) + - [func WithInfoHandlerFrom\(c interface\{ InfoHandler\(\) http.Handler \}\) Option](#WithInfoHandlerFrom) + - [func WithMeterProvider\(provider metric.MeterProvider\) Option](#WithMeterProvider) - [func WithTextMapCarrierExtractFunc\(fn func\(context.Context\) propagation.TextMapCarrier\) Option](#WithTextMapCarrierExtractFunc) - [func WithTextMapPropagator\(propagator propagation.TextMapPropagator\) Option](#WithTextMapPropagator) - [func WithTracerProvider\(provider oteltrace.TracerProvider\) Option](#WithTracerProvider) -- [type Tracer](#Tracer) - - [func NewTracer\(service string, opts ...Option\) \*Tracer](#NewTracer) - - [func \(t \*Tracer\) ApplyTraceMiddlewares\(c \*courier.Client\)](#Tracer.ApplyTraceMiddlewares) - - [func \(t \*Tracer\) PublisherMiddleware\(next courier.Publisher\) courier.Publisher](#Tracer.PublisherMiddleware) - - [func \(t \*Tracer\) SubscriberMiddleware\(next courier.Subscriber\) courier.Subscriber](#Tracer.SubscriberMiddleware) - - [func \(t \*Tracer\) UnsubscriberMiddleware\(next courier.Unsubscriber\) courier.Unsubscriber](#Tracer.UnsubscriberMiddleware) +- [type TopicAttributeTransformer](#TopicAttributeTransformer) +- [type UseMiddleware](#UseMiddleware) ## Constants @@ -41,100 +44,76 @@ const ( MQTTTopicWithQoS = attribute.Key("mqtt.topicwithqos") // MQTTRetained is the attribute key for tracing message retained flag MQTTRetained = attribute.Key("mqtt.retained") + // MQTTClientID is the attribute key for tracing mqtt client id + MQTTClientID = attribute.Key("mqtt.clientid") ) ``` - -## func [DisableCallbackTracing](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L59) +## Variables -```go -func DisableCallbackTracing(opts *traceOptions) -``` - -DisableCallbackTracing disables implicit tracing on subscription callbacks. - - -## func [DisablePublisherTracing](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L62) +DisableCallbackTracing disables implicit tracing on subscription callbacks. ```go -func DisablePublisherTracing(opts *traceOptions) +var DisableCallbackTracing = &disableTracePathOpt{traceCallback} ``` -DisablePublisherTracing disables courier.Publisher tracing. - - -## func [DisableSubscriberTracing](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L65) +DisablePublisherTracing disables courier.Publisher tracing. ```go -func DisableSubscriberTracing(opts *traceOptions) +var DisablePublisherTracing = &disableTracePathOpt{tracePublisher} ``` -DisableSubscriberTracing disables courier.Subscriber tracing. - - -## func [DisableUnsubscriberTracing](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L68) +DisableSubscriberTracing disables courier.Subscriber tracing. ```go -func DisableUnsubscriberTracing(opts *traceOptions) +var DisableSubscriberTracing = &disableTracePathOpt{traceSubscriber} ``` -DisableUnsubscriberTracing disables courier.Unsubscriber tracing. - - -## type [Option](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L30) - -Option helps configure trace options. +DisableUnsubscriberTracing disables courier.Unsubscriber tracing. ```go -type Option func(*traceOptions) +var DisableUnsubscriberTracing = &disableTracePathOpt{traceUnsubscriber} ``` - -### func [WithTextMapCarrierExtractFunc](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L54) + +## func [DefaultTopicAttributeTransformer](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L63) ```go -func WithTextMapCarrierExtractFunc(fn func(context.Context) propagation.TextMapCarrier) Option +func DefaultTopicAttributeTransformer(_ context.Context, topic string) string ``` -WithTextMapCarrierExtractFunc is used to specify the function which should be used to extract propagation.TextMapCarrier from the ongoing context.Context. +DefaultTopicAttributeTransformer is the default transformer for topic attribute. - -### func [WithTextMapPropagator](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L48) + +## type [BucketBoundaries](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L66-L68) -```go -func WithTextMapPropagator(propagator propagation.TextMapPropagator) Option -``` - -WithTextMapPropagator specifies the propagator to use for extracting/injecting key\-value texts. If none is specified, the global provider is used. - - -### func [WithTracerProvider](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L42) +BucketBoundaries helps override default histogram bucket boundaries for metrics. ```go -func WithTracerProvider(provider oteltrace.TracerProvider) Option +type BucketBoundaries struct { + Publisher, Subscriber, Unsubscriber, Callback []float64 +} ``` -WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, the global provider is used. - - -## type [Tracer](https://github.com/gojek/courier-go/blob/main/otelcourier/trace.go#L17-L23) + +## type [OTel](https://github.com/gojek/courier-go/blob/main/otelcourier/otel.go#L29-L41) -Tracer implements tracing abilities using OpenTelemetry SDK. +OTel implements tracing & metric abilities using OpenTelemetry SDK. ```go -type Tracer struct { +type OTel struct { // contains filtered or unexported fields } ``` - -### func [NewTracer](https://github.com/gojek/courier-go/blob/main/otelcourier/trace.go#L26) + +### func [New](https://github.com/gojek/courier-go/blob/main/otelcourier/otel.go#L44) ```go -func NewTracer(service string, opts ...Option) *Tracer +func New(service string, opts ...Option) *OTel ``` -NewTracer creates a new Tracer with Option\(s\). +New creates a new OTel with Option\(s\).
Example

@@ -142,84 +121,175 @@ NewTracer creates a new Tracer with Option\(s\). ```go -package main - -import ( - "context" - "os" - "os/signal" - "syscall" +tp := trace.NewTracerProvider() +defer tp.Shutdown(context.Background()) - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/sdk/trace" - - courier "github.com/gojek/courier-go" - "github.com/gojek/courier-go/otelcourier" +exporter, err := prometheus.New( +/* Add a non-default prometheus registry here with `prometheus.WithRegisterer` option, if needed. */ ) +if err != nil { + panic(err) +} +mp := metric.NewMeterProvider(metric.WithReader(exporter)) -func main() { - tp := trace.NewTracerProvider() - defer tp.Shutdown(context.Background()) +otel.SetTracerProvider(tp) +otel.SetMeterProvider(mp) +otel.SetTextMapPropagator(&propagation.TraceContext{}) - otel.SetTracerProvider(tp) +metricLabelMapper := otelcourier.TopicAttributeTransformer(func(ctx context.Context, topic string) string { + if strings.HasPrefix(topic, "test") { + return "test" + } - c, _ := courier.NewClient() - otelcourier.NewTracer("service-name").ApplyTraceMiddlewares(c) + return "other" +}) - if err := c.Start(); err != nil { - panic(err) - } +c, _ := courier.NewClient() +otelcourier.New( + "service-name", + // Use this to also track active connections. + otelcourier.WithInfoHandlerFrom(c), + metricLabelMapper, +).ApplyMiddlewares(c) - stopCh := make(chan os.Signal, 1) - signal.Notify(stopCh, []os.Signal{os.Interrupt, syscall.SIGTERM}...) +if err := c.Start(); err != nil { + panic(err) +} - if err := c.Publish( - context.Background(), "test-topic", "message", courier.QOSOne); err != nil { - panic(err) - } - <-stopCh +ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + +if err := c.Publish( + context.Background(), "test-topic", "message", courier.QOSOne); err != nil { + panic(err) +} - c.Stop() +if err := c.Publish( + context.Background(), "other-topic", "message", courier.QOSOne); err != nil { + panic(err) } + +// Here, you can expose the metrics at /metrics endpoint for prometheus.DefaultRegisterer. + +<-ctx.Done() + +c.Stop() ```

- -### func \(\*Tracer\) [ApplyTraceMiddlewares](https://github.com/gojek/courier-go/blob/main/otelcourier/trace.go#L49) + +### func \(\*OTel\) [ApplyMiddlewares](https://github.com/gojek/courier-go/blob/main/otelcourier/otel.go#L81) ```go -func (t *Tracer) ApplyTraceMiddlewares(c *courier.Client) +func (t *OTel) ApplyMiddlewares(c UseMiddleware) ``` -ApplyTraceMiddlewares will instrument all the operations of a courier.Client instance according to Option\(s\) used. +ApplyMiddlewares will instrument all the operations of a UseMiddleware instance according to Option\(s\) used. - -### func \(\*Tracer\) [PublisherMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/publish.go#L20) + +### func \(\*OTel\) [PublisherMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/publish.go#L23) ```go -func (t *Tracer) PublisherMiddleware(next courier.Publisher) courier.Publisher +func (t *OTel) PublisherMiddleware(next courier.Publisher) courier.Publisher ``` PublisherMiddleware is a courier.PublisherMiddlewareFunc for tracing publish calls. - -### func \(\*Tracer\) [SubscriberMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/subscribe.go#L25) + +### func \(\*OTel\) [SubscriberMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/subscribe.go#L43) ```go -func (t *Tracer) SubscriberMiddleware(next courier.Subscriber) courier.Subscriber +func (t *OTel) SubscriberMiddleware(next courier.Subscriber) courier.Subscriber ``` SubscriberMiddleware is a courier.SubscriberMiddlewareFunc for tracing subscribe calls. - -### func \(\*Tracer\) [UnsubscriberMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/unsubscribe.go#L19) + +### func \(\*OTel\) [UnsubscriberMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/unsubscribe.go#L21) ```go -func (t *Tracer) UnsubscriberMiddleware(next courier.Unsubscriber) courier.Unsubscriber +func (t *OTel) UnsubscriberMiddleware(next courier.Unsubscriber) courier.Unsubscriber ``` UnsubscriberMiddleware is a courier.UnsubscriberMiddlewareFunc for tracing unsubscribe calls. + +## type [Option](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L14) + +Option helps configure trace options. + +```go +type Option interface { + // contains filtered or unexported methods +} +``` + + +### func [WithInfoHandlerFrom](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L46) + +```go +func WithInfoHandlerFrom(c interface{ InfoHandler() http.Handler }) Option +``` + +WithInfoHandlerFrom is used to specify the handler which should be used to extract client information from the courier.Client instance. + + +### func [WithMeterProvider](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L28) + +```go +func WithMeterProvider(provider metric.MeterProvider) Option +``` + +WithMeterProvider specifies a meter provider to use for creating a meter. If none is specified, the global provider is used. + + +### func [WithTextMapCarrierExtractFunc](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L40) + +```go +func WithTextMapCarrierExtractFunc(fn func(context.Context) propagation.TextMapCarrier) Option +``` + +WithTextMapCarrierExtractFunc is used to specify the function which should be used to extract propagation.TextMapCarrier from the ongoing context.Context. + + +### func [WithTextMapPropagator](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L34) + +```go +func WithTextMapPropagator(propagator propagation.TextMapPropagator) Option +``` + +WithTextMapPropagator specifies the propagator to use for extracting/injecting key\-value texts. If none is specified, the global provider is used. + + +### func [WithTracerProvider](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L22) + +```go +func WithTracerProvider(provider oteltrace.TracerProvider) Option +``` + +WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, the global provider is used. + + +## type [TopicAttributeTransformer](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L18) + +TopicAttributeTransformer helps transform topic before making an attribute for it. It is used in metric recording only. Traces use the original topic. + +```go +type TopicAttributeTransformer func(context.Context, string) string +``` + + +## type [UseMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/otel.go#L22-L26) + +UseMiddleware is an interface that defines the methods to apply middlewares to a courier.Client or similar instance. + +```go +type UseMiddleware interface { + UsePublisherMiddleware(mwf ...courier.PublisherMiddlewareFunc) + UseSubscriberMiddleware(mwf ...courier.SubscriberMiddlewareFunc) + UseUnsubscriberMiddleware(mwf ...courier.UnsubscriberMiddlewareFunc) +} +``` + Generated by [gomarkdoc](https://github.com/princjef/gomarkdoc) diff --git a/otelcourier/example_test.go b/otelcourier/example_test.go index f108bb0..1cdc78e 100644 --- a/otelcourier/example_test.go +++ b/otelcourier/example_test.go @@ -4,36 +4,69 @@ import ( "context" "os" "os/signal" - "syscall" + "strings" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/trace" - courier "github.com/gojek/courier-go" + "github.com/gojek/courier-go" "github.com/gojek/courier-go/otelcourier" ) -func ExampleNewTracer() { +func ExampleNew() { tp := trace.NewTracerProvider() defer tp.Shutdown(context.Background()) + exporter, err := prometheus.New( + /* Add a non-default prometheus registry here with `prometheus.WithRegisterer` option, if needed. */ + ) + if err != nil { + panic(err) + } + mp := metric.NewMeterProvider(metric.WithReader(exporter)) + otel.SetTracerProvider(tp) + otel.SetMeterProvider(mp) + otel.SetTextMapPropagator(&propagation.TraceContext{}) + + metricLabelMapper := otelcourier.TopicAttributeTransformer(func(ctx context.Context, topic string) string { + if strings.HasPrefix(topic, "test") { + return "test" + } + + return "other" + }) c, _ := courier.NewClient() - otelcourier.NewTracer("service-name").ApplyTraceMiddlewares(c) + otelcourier.New( + "service-name", + // Use this to also track active connections. + otelcourier.WithInfoHandlerFrom(c), + metricLabelMapper, + ).ApplyMiddlewares(c) if err := c.Start(); err != nil { panic(err) } - stopCh := make(chan os.Signal, 1) - signal.Notify(stopCh, []os.Signal{os.Interrupt, syscall.SIGTERM}...) + ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) if err := c.Publish( context.Background(), "test-topic", "message", courier.QOSOne); err != nil { panic(err) } - <-stopCh + + if err := c.Publish( + context.Background(), "other-topic", "message", courier.QOSOne); err != nil { + panic(err) + } + + // Here, you can expose the metrics at /metrics endpoint for prometheus.DefaultRegisterer. + + <-ctx.Done() c.Stop() } diff --git a/otelcourier/go.mod b/otelcourier/go.mod index c548eb5..4af1c83 100644 --- a/otelcourier/go.mod +++ b/otelcourier/go.mod @@ -4,28 +4,37 @@ go 1.20 require ( github.com/gojek/courier-go v0.5.3 - github.com/stretchr/testify v1.8.4 - go.opentelemetry.io/otel v1.16.0 - go.opentelemetry.io/otel/sdk v1.16.0 - go.opentelemetry.io/otel/trace v1.16.0 + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/otel v1.24.0 + go.opentelemetry.io/otel/exporters/prometheus v0.46.0 + go.opentelemetry.io/otel/metric v1.24.0 + go.opentelemetry.io/otel/sdk v1.24.0 + go.opentelemetry.io/otel/trace v1.24.0 ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/eclipse/paho.mqtt.golang v1.4.3 // indirect - github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/gojekfarm/xtools/generic v0.4.1 // indirect - github.com/gorilla/websocket v1.5.0 // indirect - github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/gojekfarm/xtools/generic v0.6.0 // indirect + github.com/gorilla/websocket v1.5.1 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/kr/pretty v0.3.1 // indirect + github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/objx v0.5.1 // indirect - go.opentelemetry.io/otel/metric v1.16.0 // indirect - golang.org/x/net v0.14.0 // indirect - golang.org/x/sync v0.3.0 // indirect - golang.org/x/sys v0.11.0 // indirect + github.com/prometheus/client_golang v1.18.0 // indirect + github.com/prometheus/client_model v0.6.0 // indirect + github.com/prometheus/common v0.45.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect + go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect + golang.org/x/net v0.22.0 // indirect + golang.org/x/sync v0.6.0 // indirect + golang.org/x/sys v0.18.0 // indirect + google.golang.org/protobuf v1.32.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/otelcourier/go.sum b/otelcourier/go.sum index 1af49dd..d223d63 100644 --- a/otelcourier/go.sum +++ b/otelcourier/go.sum @@ -1,58 +1,66 @@ -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= -github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/gojekfarm/xtools/generic v0.4.1 h1:hOR1Tc8ZZYNg+twjRy7yJdF9NDTd6mjNAbEiI2H39CU= -github.com/gojekfarm/xtools/generic v0.4.1/go.mod h1:VA3++MjvoK19kunt2IWa+xICcazkM21WZUVfllAovWs= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/gojekfarm/xtools/generic v0.6.0 h1:wfhuwFGqOF6M+lUkp+lZK0/DZi9ibw4NENdihhnaIEA= +github.com/gojekfarm/xtools/generic v0.6.0/go.mod h1:T0yk+yt4s1jwr9Gbtyb/BADXGKRfCYIBz9pN1GnmkQA= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.1 h1:4VhoImhV/Bm0ToFkXFi8hXNXwpDRZ/ynw3amt82mzq0= -github.com/stretchr/objx v0.5.1/go.mod h1:/iHQpkQwBD6DLUmQ4pE+s1TXdob1mORJ4/UFdrifcy0= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= -go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= -go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= -go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= -go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= -go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= -go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= -go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= -golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= -golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= +github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= +github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos= +github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= +github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= +github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ= +go.opentelemetry.io/otel/exporters/prometheus v0.46.0/go.mod h1:ztwVUHe5DTR/1v7PeuGRnU5Bbd4QKYwApWmuutKsJSs= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= +go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/sdk/metric v1.24.0 h1:yyMQrPzF+k88/DbH7o4FMAs80puqd+9osbiBrJrz/w8= +go.opentelemetry.io/otel/sdk/metric v1.24.0/go.mod h1:I6Y5FjH6rvEnTTAYQz3Mmv2kl6Ek5IIrmwTLqMrrOE0= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= +golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/otelcourier/info_handler.go b/otelcourier/info_handler.go new file mode 100644 index 0000000..7acc82a --- /dev/null +++ b/otelcourier/info_handler.go @@ -0,0 +1,68 @@ +package otelcourier + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/gojek/courier-go" +) + +type infoHandler http.HandlerFunc + +type infoResponse struct { + Clients []courier.MQTTClientInfo `json:"clients,omitempty"` +} + +func (e infoHandler) callback(attrs ...attribute.KeyValue) metric.Int64Callback { + hf := http.HandlerFunc(e) + + return func(ctx context.Context, observer metric.Int64Observer) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/", nil) + if err != nil { + return err + } + + resp := inMemResponseWriter{headers: make(http.Header)} + + hf.ServeHTTP(&resp, req) + + var ir infoResponse + if err := json.NewDecoder(&resp.body).Decode(&ir); err != nil { + return err + } + + for _, cl := range ir.Clients { + observer.Observe( + boolInt64(cl.Connected), + metric.WithAttributes(append(attrs, MQTTClientID.String(cl.ClientID))...), + ) + } + + return nil + } +} + +type inMemResponseWriter struct { + body bytes.Buffer + headers http.Header + status int +} + +func (w *inMemResponseWriter) Header() http.Header { return w.headers } + +func (w *inMemResponseWriter) Write(b []byte) (int, error) { return w.body.Write(b) } + +func (w *inMemResponseWriter) WriteHeader(statusCode int) { w.status = statusCode } + +func boolInt64(connected bool) int64 { + if connected { + return 1 + } + + return 0 +} diff --git a/otelcourier/info_handler_test.go b/otelcourier/info_handler_test.go new file mode 100644 index 0000000..ac88cf9 --- /dev/null +++ b/otelcourier/info_handler_test.go @@ -0,0 +1,127 @@ +package otelcourier + +import ( + "context" + "net/http" + "os" + "os/signal" + "strconv" + "strings" + "testing" + "time" + + prom "github.com/prometheus/client_golang/prometheus" + pcm "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/metric" + + "github.com/gojek/courier-go" +) + +var defOpts []courier.ClientOption + +func init() { + brokerAddress := os.Getenv("BROKER_ADDRESS") // host:port format + if len(brokerAddress) == 0 { + brokerAddress = "localhost:1883" + } + + list := strings.Split(brokerAddress, ":") + p, _ := strconv.Atoi(list[1]) + + defOpts = append(defOpts, courier.WithAddress(list[0], uint16(p)), courier.WithClientID("clientID")) +} + +func TestConnectedClientMetric(t *testing.T) { + t.Parallel() + + reg := prom.NewRegistry() + exporter, err := prometheus.New(prometheus.WithRegisterer(reg)) + require.NoError(t, err) + mp := metric.NewMeterProvider(metric.WithReader(exporter)) + + c, err := courier.NewClient(defOpts...) + _ = New("test-service", WithMeterProvider(mp), WithInfoHandlerFrom(c)) + assert.NoError(t, err) + + ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + errCh := make(chan error, 1) + + go func() { + errCh <- c.Run(ctx) + }() + + if !courier.WaitForConnection(c, 3*time.Second, 100*time.Millisecond) { + t.Fatal("client did not connect") + } + + <-time.After(1500 * time.Millisecond) + + got, err := reg.Gather() + assert.NoError(t, err) + + found := false + var metricFamily *pcm.MetricFamily + for _, mf := range got { + if mf.GetName() == "courier_client_connected" { + found = true + metricFamily = mf + break + } + } + + assert.True(t, found) + assert.Len(t, metricFamily.GetMetric(), 1) + + m := metricFamily.GetMetric()[0] + assert.EqualValues(t, 1, m.GetGauge().GetValue()) + + labelFound := false + for _, lp := range m.GetLabel() { + if lp.GetName() == "mqtt_clientid" { + assert.Equal(t, "clientID", lp.GetValue()) + labelFound = true + } + } + + assert.True(t, labelFound) +} + +func Test_boolInt64(t *testing.T) { + tests := []struct { + name string + connected bool + want int64 + }{ + { + name: "connected", + connected: true, + want: 1, + }, + { + name: "not connected", + connected: false, + want: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.EqualValuesf(t, tt.want, boolInt64(tt.connected), "boolInt64(%v)", tt.connected) + }) + } +} + +func TestInfoHandler_callback_error(t *testing.T) { + ih := infoHandler(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte("invalid json")) + }) + + cb := ih.callback() + err := cb(context.Background(), nil) + assert.EqualError(t, err, "invalid character 'i' looking for beginning of value") + + assert.EqualError(t, cb(nil, nil), "net/http: nil Context") +} diff --git a/otelcourier/metric.go b/otelcourier/metric.go new file mode 100644 index 0000000..5afc6c4 --- /dev/null +++ b/otelcourier/metric.go @@ -0,0 +1,134 @@ +package otelcourier + +import ( + "context" + "fmt" + "time" + + prom "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +var ( + defaultBoundaries = prom.ExponentialBucketsRange(0.001, 1, 7) + defBucketBoundaries = map[tracePath][]float64{ + tracePublisher: prom.ExponentialBucketsRange(0.0001, 1, 10), + traceSubscriber: defaultBoundaries, + traceUnsubscriber: defaultBoundaries, + traceCallback: prom.ExponentialBucketsRange(0.001, 10, 15), + } + + metricFlows = map[tracePath]string{ + tracePublisher: "publish", + traceSubscriber: "subscribe", + traceUnsubscriber: "unsubscribe", + traceCallback: "subscribe.callback", + } +) + +func (t *OTel) initRecorders(histogramBoundaries map[tracePath][]float64) { + for path, flow := range metricFlows { + if !t.tracePaths.match(path) { + continue + } + + t.rc[path] = t.newRecorder(flow, histogramBoundaries[path]) + } + + if t.infoHandler != nil { + t.initInfoHandler() + } +} + +func (t *OTel) initInfoHandler() { + if _, err := t.meter.Int64ObservableUpDownCounter( + "courier.client.connected", + metric.WithDescription("Tells if a client is connected or not, partitioned by client ID."), + metric.WithInt64Callback(infoHandler(t.infoHandler.ServeHTTP). + callback(semconv.ServiceNameKey.String(t.service))), + ); err != nil { + panic(err) + } +} + +type recordersOp func(*recorders) error + +func (t *OTel) newRecorder(flow string, boundaries []float64) *recorders { + var rs recorders + + for _, op := range []recordersOp{ + func(r *recorders) error { + ac, err := t.meter.Int64Counter( + fmt.Sprintf("courier.%s.attempts", flow), + metric.WithDescription(fmt.Sprintf("Number of %s attempts", flow)), + ) + r.attempts = ac + + return err + }, + func(r *recorders) error { + fc, err := t.meter.Int64Counter( + fmt.Sprintf("courier.%s.failures", flow), + metric.WithDescription(fmt.Sprintf("Number of %s failures", flow)), + ) + r.failures = fc + + return err + }, + func(r *recorders) error { + lt, err := t.meter.Float64Histogram( + fmt.Sprintf("courier.%s.latency", flow), + metric.WithDescription(fmt.Sprintf("Latency of %s calls", flow)), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(boundaries...), + ) + r.latency = lt + + return err + }, + } { + if err := op(&rs); err != nil { + panic(err) + } + } + + return &rs +} + +type recorder map[tracePath]*recorders + +func (r recorder) incAttempt(ctx context.Context, path tracePath, opts ...metric.AddOption) { + c, ok := r[path] + if !ok { + return + } + + c.attempts.Add(ctx, 1, opts...) +} + +func (r recorder) incFailure(ctx context.Context, path tracePath, opts ...metric.AddOption) { + c, ok := r[path] + if !ok { + return + } + + c.failures.Add(ctx, 1, opts...) +} + +func (r recorder) recordLatency( + ctx context.Context, path tracePath, latency time.Duration, opts ...metric.RecordOption, +) { + c, ok := r[path] + if !ok { + return + } + + c.latency.Record(ctx, latency.Seconds(), opts...) +} + +type recorders struct { + attempts metric.Int64Counter + failures metric.Int64Counter + latency metric.Float64Histogram +} diff --git a/otelcourier/metric_test.go b/otelcourier/metric_test.go new file mode 100644 index 0000000..218f9ed --- /dev/null +++ b/otelcourier/metric_test.go @@ -0,0 +1,26 @@ +package otelcourier + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/sdk/metric" +) + +func Test_recorderOpsWithoutInitialization(t *testing.T) { + r := make(recorder) + + assert.NotPanics(t, func() { + ctx := context.Background() + r.incAttempt(ctx, tracePublisher) + r.incFailure(ctx, tracePublisher) + r.recordLatency(ctx, tracePublisher, 0) + }) +} + +func Test_recorderPanicsWithInvalidFlowName(t *testing.T) { + ot := &OTel{meter: metric.NewMeterProvider().Meter(tracerName)} + + assert.Panics(t, func() { _ = ot.newRecorder("invalid%flow", nil) }) +} diff --git a/otelcourier/options.go b/otelcourier/options.go index e996f79..68b05bd 100644 --- a/otelcourier/options.go +++ b/otelcourier/options.go @@ -2,67 +2,130 @@ package otelcourier import ( "context" + "net/http" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" oteltrace "go.opentelemetry.io/otel/trace" ) -type tracePath uint - -func (tp tracePath) match(o tracePath) bool { return tp&o != 0 } - -const ( - tracePublisher tracePath = 1 << iota - traceSubscriber - traceUnsubscriber - traceCallback -) - -type traceOptions struct { - tracerProvider oteltrace.TracerProvider - propagator propagation.TextMapPropagator - textMapCarrierExtractor func(context.Context) propagation.TextMapCarrier - tracePaths tracePath -} - // Option helps configure trace options. -type Option func(*traceOptions) +type Option interface{ apply(*options) } -func defaultOptions() *traceOptions { - return &traceOptions{ - tracerProvider: otel.GetTracerProvider(), - propagator: otel.GetTextMapPropagator(), - tracePaths: tracePublisher + traceSubscriber + traceUnsubscriber + traceCallback, - } -} +// TopicAttributeTransformer helps transform topic before making an attribute for it. +// It is used in metric recording only. Traces use the original topic. +type TopicAttributeTransformer func(context.Context, string) string // WithTracerProvider specifies a tracer provider to use for creating a tracer. // If none is specified, the global provider is used. func WithTracerProvider(provider oteltrace.TracerProvider) Option { - return func(opts *traceOptions) { opts.tracerProvider = provider } + return optFn(func(opts *options) { opts.tracerProvider = provider }) +} + +// WithMeterProvider specifies a meter provider to use for creating a meter. +// If none is specified, the global provider is used. +func WithMeterProvider(provider metric.MeterProvider) Option { + return optFn(func(opts *options) { opts.meterProvider = provider }) } // WithTextMapPropagator specifies the propagator to use for extracting/injecting key-value texts. // If none is specified, the global provider is used. func WithTextMapPropagator(propagator propagation.TextMapPropagator) Option { - return func(opts *traceOptions) { opts.propagator = propagator } + return optFn(func(opts *options) { opts.propagator = propagator }) } // WithTextMapCarrierExtractFunc is used to specify the function which should be used to // extract propagation.TextMapCarrier from the ongoing context.Context. func WithTextMapCarrierExtractFunc(fn func(context.Context) propagation.TextMapCarrier) Option { - return func(opts *traceOptions) { opts.textMapCarrierExtractor = fn } + return optFn(func(opts *options) { opts.textMapCarrierExtractor = fn }) +} + +// WithInfoHandlerFrom is used to specify the handler which should be used to +// extract client information from the courier.Client instance. +func WithInfoHandlerFrom(c interface{ InfoHandler() http.Handler }) Option { + return optFn(func(opts *options) { opts.infoHandler = c.InfoHandler() }) } // DisableCallbackTracing disables implicit tracing on subscription callbacks. -func DisableCallbackTracing(opts *traceOptions) { opts.tracePaths &^= traceCallback } +var DisableCallbackTracing = &disableTracePathOpt{traceCallback} // DisablePublisherTracing disables courier.Publisher tracing. -func DisablePublisherTracing(opts *traceOptions) { opts.tracePaths &^= tracePublisher } +var DisablePublisherTracing = &disableTracePathOpt{tracePublisher} // DisableSubscriberTracing disables courier.Subscriber tracing. -func DisableSubscriberTracing(opts *traceOptions) { opts.tracePaths &^= traceSubscriber } +var DisableSubscriberTracing = &disableTracePathOpt{traceSubscriber} // DisableUnsubscriberTracing disables courier.Unsubscriber tracing. -func DisableUnsubscriberTracing(opts *traceOptions) { opts.tracePaths &^= traceUnsubscriber } +var DisableUnsubscriberTracing = &disableTracePathOpt{traceUnsubscriber} + +// DefaultTopicAttributeTransformer is the default transformer for topic attribute. +func DefaultTopicAttributeTransformer(_ context.Context, topic string) string { return topic } + +// BucketBoundaries helps override default histogram bucket boundaries for metrics. +type BucketBoundaries struct { + Publisher, Subscriber, Unsubscriber, Callback []float64 +} + +/// private types and functions + +type optFn func(*options) + +func (fn optFn) apply(opts *options) { fn(opts) } + +func defaultOptions() *options { + return &options{ + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), + propagator: otel.GetTextMapPropagator(), + tracePaths: tracePublisher + traceSubscriber + traceUnsubscriber + traceCallback, + topicTransformer: DefaultTopicAttributeTransformer, + histogramBoundaries: defBucketBoundaries, + } +} + +const ( + tracePublisher tracePath = 1 << iota + traceSubscriber + traceUnsubscriber + traceCallback +) + +type options struct { + tracerProvider oteltrace.TracerProvider + meterProvider metric.MeterProvider + propagator propagation.TextMapPropagator + textMapCarrierExtractor func(context.Context) propagation.TextMapCarrier + tracePaths tracePath + topicTransformer TopicAttributeTransformer + infoHandler http.Handler + histogramBoundaries map[tracePath][]float64 +} + +type tracePath uint + +func (tp tracePath) match(o tracePath) bool { return tp&o != 0 } + +type disableTracePathOpt struct{ tracePath } + +func (o *disableTracePathOpt) apply(opts *options) { opts.tracePaths &^= o.tracePath } + +func (t TopicAttributeTransformer) apply(opts *options) { opts.topicTransformer = t } + +func (b BucketBoundaries) apply(opts *options) { + if b.Publisher != nil { + opts.histogramBoundaries[tracePublisher] = b.Publisher + } + + if b.Subscriber != nil { + opts.histogramBoundaries[traceSubscriber] = b.Subscriber + } + + if b.Unsubscriber != nil { + opts.histogramBoundaries[traceUnsubscriber] = b.Unsubscriber + } + + if b.Callback != nil { + opts.histogramBoundaries[traceCallback] = b.Callback + } +} diff --git a/otelcourier/options_test.go b/otelcourier/options_test.go index e4d9c9e..88d7e94 100644 --- a/otelcourier/options_test.go +++ b/otelcourier/options_test.go @@ -14,100 +14,151 @@ func TestOption(t *testing.T) { testcases := []struct { name string options []Option - want *traceOptions + want *options }{ { name: "DefaultOptions", - want: &traceOptions{ - tracerProvider: otel.GetTracerProvider(), - propagator: otel.GetTextMapPropagator(), - tracePaths: tracePublisher + traceSubscriber + traceUnsubscriber + traceCallback, + want: &options{ + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), + propagator: otel.GetTextMapPropagator(), + tracePaths: tracePublisher + traceSubscriber + traceUnsubscriber + traceCallback, + histogramBoundaries: defBucketBoundaries, }, }, { name: "DisablePublisher", options: []Option{DisablePublisherTracing}, - want: &traceOptions{ - tracerProvider: otel.GetTracerProvider(), - propagator: otel.GetTextMapPropagator(), - tracePaths: traceSubscriber + traceUnsubscriber + traceCallback, + want: &options{ + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), + propagator: otel.GetTextMapPropagator(), + tracePaths: traceSubscriber + traceUnsubscriber + traceCallback, + histogramBoundaries: defBucketBoundaries, }, }, { name: "DisableSubscriber", options: []Option{DisableSubscriberTracing}, - want: &traceOptions{ - tracerProvider: otel.GetTracerProvider(), - propagator: otel.GetTextMapPropagator(), - tracePaths: tracePublisher + traceUnsubscriber + traceCallback, + want: &options{ + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), + propagator: otel.GetTextMapPropagator(), + tracePaths: tracePublisher + traceUnsubscriber + traceCallback, + histogramBoundaries: defBucketBoundaries, }, }, { name: "DisableUnsubscriber", options: []Option{DisableUnsubscriberTracing}, - want: &traceOptions{ - tracerProvider: otel.GetTracerProvider(), - propagator: otel.GetTextMapPropagator(), - tracePaths: tracePublisher + traceSubscriber + traceCallback, + want: &options{ + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), + propagator: otel.GetTextMapPropagator(), + tracePaths: tracePublisher + traceSubscriber + traceCallback, + histogramBoundaries: defBucketBoundaries, }, }, { name: "DisableCallback", options: []Option{DisableCallbackTracing}, - want: &traceOptions{ - tracerProvider: otel.GetTracerProvider(), - propagator: otel.GetTextMapPropagator(), - tracePaths: tracePublisher + traceSubscriber + traceUnsubscriber, + want: &options{ + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), + propagator: otel.GetTextMapPropagator(), + tracePaths: tracePublisher + traceSubscriber + traceUnsubscriber, + histogramBoundaries: defBucketBoundaries, }, }, { name: "DisableCallbackTwice", options: []Option{DisableCallbackTracing, DisableCallbackTracing}, - want: &traceOptions{ - tracerProvider: otel.GetTracerProvider(), - propagator: otel.GetTextMapPropagator(), - tracePaths: tracePublisher + traceSubscriber + traceUnsubscriber, + want: &options{ + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), + propagator: otel.GetTextMapPropagator(), + tracePaths: tracePublisher + traceSubscriber + traceUnsubscriber, + histogramBoundaries: defBucketBoundaries, }, }, { name: "DisableAllButCallback", options: []Option{DisablePublisherTracing, DisableSubscriberTracing, DisableUnsubscriberTracing}, - want: &traceOptions{ - tracerProvider: otel.GetTracerProvider(), - propagator: otel.GetTextMapPropagator(), - tracePaths: traceCallback, + want: &options{ + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), + propagator: otel.GetTextMapPropagator(), + tracePaths: traceCallback, + histogramBoundaries: defBucketBoundaries, }, }, { name: "DisableTwoTracers", options: []Option{DisablePublisherTracing, DisableSubscriberTracing}, - want: &traceOptions{ - tracerProvider: otel.GetTracerProvider(), - propagator: otel.GetTextMapPropagator(), - tracePaths: traceUnsubscriber + traceCallback, + want: &options{ + tracerProvider: otel.GetTracerProvider(), + meterProvider: otel.GetMeterProvider(), + propagator: otel.GetTextMapPropagator(), + tracePaths: traceUnsubscriber + traceCallback, + histogramBoundaries: defBucketBoundaries, }, }, } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - to := defaultOptions() + o := defaultOptions() for _, opt := range tt.options { - opt(to) + opt.apply(o) } - assert.Equal(t, tt.want, to) + // Ignore topicTransformer + o.topicTransformer = nil + + assert.Equal(t, tt.want, o) }) } extractorFn := func(_ context.Context) propagation.TextMapCarrier { return &propagation.MapCarrier{} } t.Run("TextMapCarrierExtractor", func(t *testing.T) { - to := defaultOptions() + o := defaultOptions() + + WithTextMapCarrierExtractFunc(extractorFn).apply(o) + + assert.Equal(t, fmt.Sprintf("%p", extractorFn), fmt.Sprintf("%p", o.textMapCarrierExtractor)) + }) + + t.Run("DefaultTopicAttributeTransformer", func(t *testing.T) { + assert.Equal(t, "topic", DefaultTopicAttributeTransformer(context.Background(), "topic")) + }) + + customTransformer := func(_ context.Context, topic string) string { return topic + "custom" } + + t.Run("CustomTopicAttributeTransformer", func(t *testing.T) { + o := defaultOptions() + + TopicAttributeTransformer(customTransformer).apply(o) + + assert.Equal(t, "topiccustom", o.topicTransformer(context.Background(), "topic")) + }) + + t.Run("BucketBoundaries", func(t *testing.T) { + o := defaultOptions() + + bb := BucketBoundaries{ + Publisher: []float64{1, 2, 3}, + Subscriber: []float64{4, 5, 6}, + Unsubscriber: []float64{7, 8, 9}, + Callback: []float64{10, 11, 12}, + } - WithTextMapCarrierExtractFunc(extractorFn)(to) + bb.apply(o) - assert.Equal(t, fmt.Sprintf("%p", extractorFn), fmt.Sprintf("%p", to.textMapCarrierExtractor)) + assert.Equal(t, []float64{1, 2, 3}, o.histogramBoundaries[tracePublisher]) + assert.Equal(t, []float64{4, 5, 6}, o.histogramBoundaries[traceSubscriber]) + assert.Equal(t, []float64{7, 8, 9}, o.histogramBoundaries[traceUnsubscriber]) + assert.Equal(t, []float64{10, 11, 12}, o.histogramBoundaries[traceCallback]) }) } diff --git a/otelcourier/otel.go b/otelcourier/otel.go new file mode 100644 index 0000000..83c7432 --- /dev/null +++ b/otelcourier/otel.go @@ -0,0 +1,93 @@ +package otelcourier + +import ( + "context" + "fmt" + "net/http" + "time" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + + "github.com/gojek/courier-go" +) + +const ( + tracerName = "github.com/gojek/courier-go/otelcourier" +) + +// UseMiddleware is an interface that defines the methods to +// apply middlewares to a courier.Client or similar instance. +type UseMiddleware interface { + UsePublisherMiddleware(mwf ...courier.PublisherMiddlewareFunc) + UseSubscriberMiddleware(mwf ...courier.SubscriberMiddlewareFunc) + UseUnsubscriberMiddleware(mwf ...courier.UnsubscriberMiddlewareFunc) +} + +// OTel implements tracing & metric abilities using OpenTelemetry SDK. +type OTel struct { + service string + tracePaths tracePath + tracer trace.Tracer + meter metric.Meter + propagator propagation.TextMapPropagator + textMapCarrierFunc func(context.Context) propagation.TextMapCarrier + topicTransformer TopicAttributeTransformer + infoHandler http.Handler + + rc recorder + tnow func() time.Time +} + +// New creates a new OTel with Option(s). +func New(service string, opts ...Option) *OTel { + o := defaultOptions() + + for _, opt := range opts { + opt.apply(o) + } + + vsn := fmt.Sprintf("semver:%s", courier.Version()) + tracer := o.tracerProvider.Tracer( + tracerName, + trace.WithInstrumentationVersion(vsn), + ) + meter := o.meterProvider.Meter( + tracerName, + metric.WithInstrumentationVersion(vsn), + ) + + t := &OTel{ + service: service, + tracer: tracer, + meter: meter, + propagator: o.propagator, + textMapCarrierFunc: o.textMapCarrierExtractor, + topicTransformer: o.topicTransformer, + tracePaths: o.tracePaths, + infoHandler: o.infoHandler, + rc: make(recorder), + tnow: time.Now, + } + + t.initRecorders(o.histogramBoundaries) + + return t +} + +// ApplyMiddlewares will instrument all the operations of a UseMiddleware instance +// according to Option(s) used. +func (t *OTel) ApplyMiddlewares(c UseMiddleware) { + if t.tracePaths.match(tracePublisher) { + c.UsePublisherMiddleware(t.PublisherMiddleware) + } + + if t.tracePaths.match(traceSubscriber) { + c.UseSubscriberMiddleware(t.SubscriberMiddleware) + } + + if t.tracePaths.match(traceUnsubscriber) { + c.UseUnsubscriberMiddleware(t.UnsubscriberMiddleware) + } +} diff --git a/otelcourier/trace_test.go b/otelcourier/otel_test.go similarity index 78% rename from otelcourier/trace_test.go rename to otelcourier/otel_test.go index c495d3e..99d7ab4 100644 --- a/otelcourier/trace_test.go +++ b/otelcourier/otel_test.go @@ -10,7 +10,7 @@ import ( "go.opentelemetry.io/otel/sdk/trace/tracetest" oteltrace "go.opentelemetry.io/otel/trace" - courier "github.com/gojek/courier-go" + "github.com/gojek/courier-go" ) func TestChildSpanFromGlobalTracer(t *testing.T) { @@ -19,9 +19,9 @@ func TestChildSpanFromGlobalTracer(t *testing.T) { tp.RegisterSpanProcessor(sr) otel.SetTracerProvider(tp) - mwf := NewTracer("test-service") + mwf := New("test-service") - p := mwf.PublisherMiddleware(courier.PublisherFunc(func(ctx context.Context, topic string, message interface{}, opts ...courier.Option) error { + p := mwf.PublisherMiddleware(courier.PublisherFunc(func(ctx context.Context, topic string, message any, opts ...courier.Option) error { span := oteltrace.SpanFromContext(ctx) _, ok := span.(trace.ReadWriteSpan) assert.True(t, ok) @@ -37,9 +37,9 @@ func TestChildSpanFromCustomTracer(t *testing.T) { sr := tracetest.NewSpanRecorder() tp.RegisterSpanProcessor(sr) - m := NewTracer("test-service", WithTracerProvider(tp)) + m := New("test-service", WithTracerProvider(tp)) - p := m.PublisherMiddleware(courier.PublisherFunc(func(ctx context.Context, topic string, message interface{}, opts ...courier.Option) error { + p := m.PublisherMiddleware(courier.PublisherFunc(func(ctx context.Context, topic string, message any, opts ...courier.Option) error { span := oteltrace.SpanFromContext(ctx) _, ok := span.(trace.ReadWriteSpan) assert.True(t, ok) @@ -55,8 +55,8 @@ func TestInstrumentClient(t *testing.T) { sr := tracetest.NewSpanRecorder() tp.RegisterSpanProcessor(sr) - tr := NewTracer("test-service", WithTracerProvider(tp)) + tr := New("test-service", WithTracerProvider(tp)) c, err := courier.NewClient(courier.WithAddress("localhost", 1883)) assert.NoError(t, err) - tr.ApplyTraceMiddlewares(c) + tr.ApplyMiddlewares(c) } diff --git a/otelcourier/publish.go b/otelcourier/publish.go index dc79fa0..e0621d9 100644 --- a/otelcourier/publish.go +++ b/otelcourier/publish.go @@ -2,10 +2,13 @@ package otelcourier import ( "context" + "time" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "go.opentelemetry.io/otel/trace" "github.com/gojek/courier-go" @@ -17,23 +20,30 @@ const ( ) // PublisherMiddleware is a courier.PublisherMiddlewareFunc for tracing publish calls. -func (t *Tracer) PublisherMiddleware(next courier.Publisher) courier.Publisher { +func (t *OTel) PublisherMiddleware(next courier.Publisher) courier.Publisher { return courier.PublisherFunc(func( ctx context.Context, topic string, - message interface{}, + message any, opts ...courier.Option, ) error { - traceOpts := []trace.SpanStartOption{ - trace.WithAttributes(MQTTTopic.String(topic)), - trace.WithAttributes(semconv.ServiceNameKey.String(t.service)), - trace.WithSpanKind(trace.SpanKindProducer), - } - traceOpts = append(traceOpts, mapOptions(opts)...) + attrs := append([]attribute.KeyValue{ + semconv.ServiceNameKey.String(t.service), + }, mapAttributes(opts)...) + metricAttrs := metric.WithAttributes(append(attrs, MQTTTopic.String(t.topicTransformer(ctx, topic)))...) + + defer func(ctx context.Context, now time.Time, attrs metric.MeasurementOption) { + t.rc.recordLatency(ctx, tracePublisher, time.Since(now), attrs) + }(ctx, t.tnow(), metricAttrs) - ctx, span := t.tracer.Start(ctx, publishSpanName, traceOpts...) + ctx, span := t.tracer.Start(ctx, publishSpanName, + trace.WithAttributes(append(attrs, MQTTTopic.String(topic))...), + trace.WithSpanKind(trace.SpanKindProducer), + ) defer span.End() + t.rc.incAttempt(ctx, tracePublisher, metricAttrs) + if tmc, ok := message.(propagation.TextMapCarrier); ok { t.propagator.Inject(ctx, tmc) } @@ -42,21 +52,23 @@ func (t *Tracer) PublisherMiddleware(next courier.Publisher) courier.Publisher { if err != nil { span.RecordError(err) span.SetStatus(codes.Error, publishErrMessage) + + t.rc.incFailure(ctx, tracePublisher, metricAttrs) } return err }) } -func mapOptions(opts []courier.Option) []trace.SpanStartOption { - res := make([]trace.SpanStartOption, 0, len(opts)) +func mapAttributes(opts []courier.Option) []attribute.KeyValue { + res := make([]attribute.KeyValue, 0, len(opts)) for _, opt := range opts { switch opt := opt.(type) { case courier.QOSLevel: - res = append(res, trace.WithAttributes(MQTTQoS.Int(int(opt)))) + res = append(res, MQTTQoS.Int(int(opt))) case courier.Retained: - res = append(res, trace.WithAttributes(MQTTRetained.Bool(bool(opt)))) + res = append(res, MQTTRetained.Bool(bool(opt))) } } diff --git a/otelcourier/publish_test.go b/otelcourier/publish_test.go index 6b2477f..ccf5f42 100644 --- a/otelcourier/publish_test.go +++ b/otelcourier/publish_test.go @@ -1,58 +1,49 @@ package otelcourier import ( + "bytes" "context" "errors" + "fmt" "reflect" "regexp" "testing" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" oteltrace "go.opentelemetry.io/otel/trace" - courier "github.com/gojek/courier-go" + "github.com/gojek/courier-go" ) -type testTextMapCarrier struct { - headers map[string]string -} - -func (t *testTextMapCarrier) Get(key string) string { - return t.headers[key] -} - -func (t *testTextMapCarrier) Set(key string, value string) { - t.headers[key] = value -} - -func (t *testTextMapCarrier) Keys() []string { - keys := make([]string, 0, len(t.headers)) - for k := range t.headers { - keys = append(keys, k) - } - return keys -} - func TestPublishTraceSpan(t *testing.T) { tp := trace.NewTracerProvider() sr := tracetest.NewSpanRecorder() tp.RegisterSpanProcessor(sr) + reg := prom.NewRegistry() + exporter, err := prometheus.New(prometheus.WithRegisterer(reg)) + require.NoError(t, err) + mp := metric.NewMeterProvider(metric.WithReader(exporter)) + mtmc := newMockTextMapCarrier(t, "hello-world") - mwf := NewTracer("test-service", WithTracerProvider(tp), + mwf := New("test-service", WithTracerProvider(tp), WithMeterProvider(mp), WithTextMapPropagator(propagation.NewCompositeTextMapPropagator(&propagation.TraceContext{}))) uErr := errors.New("error_from_upstream") - p := mwf.PublisherMiddleware(courier.PublisherFunc(func(ctx context.Context, topic string, message interface{}, opts ...courier.Option) error { + p := mwf.PublisherMiddleware(courier.PublisherFunc(func(ctx context.Context, topic string, message any, opts ...courier.Option) error { return uErr })) @@ -61,9 +52,7 @@ func TestPublishTraceSpan(t *testing.T) { return traceParentRegex.MatchString(in) })) - err := p.Publish(context.Background(), "test-topic", mtmc, courier.QOSOne, courier.Retained(false)) - - assert.EqualError(t, err, uErr.Error()) + assert.EqualError(t, p.Publish(context.Background(), "test-topic", mtmc, courier.QOSOne, courier.Retained(false)), uErr.Error()) spans := sr.Ended() require.Len(t, spans, 1) @@ -95,11 +84,43 @@ func TestPublishTraceSpan(t *testing.T) { assert.Equal(t, publishErrMessage, span.Status().Description) }) + t.Run("Metrics", func(t *testing.T) { + vsn := courier.Version() + buf := bytes.NewBufferString(fmt.Sprintf(`# HELP courier_publish_attempts_total Number of publish attempts +# TYPE courier_publish_attempts_total counter +courier_publish_attempts_total{mqtt_qos="1",mqtt_retained="false",mqtt_topic="test-topic",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +# HELP courier_publish_failures_total Number of publish failures +# TYPE courier_publish_failures_total counter +courier_publish_failures_total{mqtt_qos="1",mqtt_retained="false",mqtt_topic="test-topic",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +`, vsn, vsn)) + assert.NoError(t, testutil.GatherAndCompare(reg, buf, + "courier_publish_attempts_total", + "courier_publish_failures_total", + )) + + metrics, err := reg.Gather() + assert.NoError(t, err) + + var found bool + for _, metricFamily := range metrics { + if metricFamily.GetName() != "courier_publish_latency_seconds" { + continue + } + found = true + + for _, m := range metricFamily.GetMetric() { + assert.EqualValues(t, 1, m.GetHistogram().GetSampleCount()) + } + } + + assert.True(t, found) + }) + mtmc.AssertExpectations(t) } func TestPublishSpanNotInstrumented(t *testing.T) { - p := courier.PublisherFunc(func(ctx context.Context, _ string, _ interface{}, _ ...courier.Option) error { + p := courier.PublisherFunc(func(ctx context.Context, _ string, _ any, _ ...courier.Option) error { span := oteltrace.SpanFromContext(ctx) ok := !span.SpanContext().IsValid() assert.True(t, ok) diff --git a/otelcourier/semconv.go b/otelcourier/semconv.go index 8f1a8e0..47c20e1 100644 --- a/otelcourier/semconv.go +++ b/otelcourier/semconv.go @@ -13,4 +13,6 @@ const ( MQTTTopicWithQoS = attribute.Key("mqtt.topicwithqos") // MQTTRetained is the attribute key for tracing message retained flag MQTTRetained = attribute.Key("mqtt.retained") + // MQTTClientID is the attribute key for tracing mqtt client id + MQTTClientID = attribute.Key("mqtt.clientid") ) diff --git a/otelcourier/subscribe.go b/otelcourier/subscribe.go index 9f342b0..70c8726 100644 --- a/otelcourier/subscribe.go +++ b/otelcourier/subscribe.go @@ -4,14 +4,19 @@ import ( "context" "fmt" "reflect" + "regexp" "runtime" "sort" + "time" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/metric" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "go.opentelemetry.io/otel/trace" - courier "github.com/gojek/courier-go" + "github.com/gojek/courier-go" ) const ( @@ -19,43 +24,89 @@ const ( subscribeMultipleSpanName = "otelcourier.SubscribeMultiple" subscribeErrMessage = "subscribe error" subscribeMultipleErrMessage = "subscribe multiple error" + + moduleNamedGroup = "module" + pkgNamedGroup = "pkg" + fnNamedGroup = "fn" +) + +var ( + runtimeCallbackExtractor = regexp.MustCompile( + fmt.Sprintf(`^(?P<%s>.+?)\/(?P<%s>[^\/.]+)\.(?P<%s>.+)$`, moduleNamedGroup, pkgNamedGroup, fnNamedGroup), + ) + moduleIndex = runtimeCallbackExtractor.SubexpIndex(moduleNamedGroup) + pkgIndex = runtimeCallbackExtractor.SubexpIndex(pkgNamedGroup) + fnIndex = runtimeCallbackExtractor.SubexpIndex(fnNamedGroup) ) // SubscriberMiddleware is a courier.SubscriberMiddlewareFunc for tracing subscribe calls. -func (t *Tracer) SubscriberMiddleware(next courier.Subscriber) courier.Subscriber { +func (t *OTel) SubscriberMiddleware(next courier.Subscriber) courier.Subscriber { return courier.NewSubscriberFuncs( func(ctx context.Context, topic string, callback courier.MessageHandler, opts ...courier.Option) error { - traceOpts := []trace.SpanStartOption{ - trace.WithAttributes(MQTTTopic.String(topic)), - trace.WithAttributes(semconv.ServiceNameKey.String(t.service)), - trace.WithSpanKind(trace.SpanKindClient), - } - traceOpts = append(traceOpts, mapOptions(opts)...) + attrs := append([]attribute.KeyValue{ + semconv.ServiceNameKey.String(t.service), + }, mapAttributes(opts)...) + metricAttrs := metric.WithAttributes(append(attrs, MQTTTopic.String(t.topicTransformer(ctx, topic)))...) + + defer func(ctx context.Context, now time.Time, attrs metric.MeasurementOption) { + t.rc.recordLatency(ctx, traceSubscriber, time.Since(now), attrs) + }(ctx, t.tnow(), metricAttrs) - ctx, span := t.tracer.Start(ctx, subscribeSpanName, traceOpts...) + ctx, span := t.tracer.Start(ctx, subscribeSpanName, + trace.WithAttributes(append(attrs, MQTTTopic.String(topic))...), + trace.WithSpanKind(trace.SpanKindClient), + ) defer span.End() + t.rc.incAttempt(ctx, traceSubscriber, metricAttrs) + err := next.Subscribe(ctx, topic, t.instrumentCallback(callback)) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, subscribeErrMessage) + + t.rc.incFailure(ctx, traceSubscriber, metricAttrs) } return err }, func(ctx context.Context, topicsWithQos map[string]courier.QOSLevel, callback courier.MessageHandler) error { - opts := []trace.SpanStartOption{ - trace.WithAttributes(MQTTTopicWithQoS.StringSlice(mapToArray(topicsWithQos))), - trace.WithAttributes(semconv.ServiceNameKey.String(t.service)), - trace.WithSpanKind(trace.SpanKindClient), + unnestMetricAttrs := make([]metric.MeasurementOption, 0, len(topicsWithQos)) + for topic, qos := range topicsWithQos { + unnestMetricAttrs = append(unnestMetricAttrs, metric.WithAttributes( + semconv.ServiceNameKey.String(t.service), + MQTTTopic.String(t.topicTransformer(ctx, topic)), + MQTTQoS.Int(int(qos)), + )) } - ctx, span := t.tracer.Start(ctx, subscribeMultipleSpanName, opts...) + + defer func(ctx context.Context, now time.Time, attrs ...metric.MeasurementOption) { + for _, attr := range attrs { + t.rc.recordLatency(ctx, traceSubscriber, time.Since(now), attr) + } + }(ctx, t.tnow(), unnestMetricAttrs...) + + ctx, span := t.tracer.Start(ctx, subscribeMultipleSpanName, + trace.WithAttributes( + semconv.ServiceNameKey.String(t.service), + MQTTTopicWithQoS.StringSlice(mapToArray(topicsWithQos)), + ), + trace.WithSpanKind(trace.SpanKindClient), + ) defer span.End() + for _, attr := range unnestMetricAttrs { + t.rc.incAttempt(ctx, traceSubscriber, attr) + } + err := next.SubscribeMultiple(ctx, topicsWithQos, t.instrumentCallback(callback)) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, subscribeMultipleErrMessage) + + for _, attr := range unnestMetricAttrs { + t.rc.incFailure(ctx, traceSubscriber, attr) + } } return err @@ -63,25 +114,56 @@ func (t *Tracer) SubscriberMiddleware(next courier.Subscriber) courier.Subscribe ) } -func (t *Tracer) instrumentCallback(in courier.MessageHandler) courier.MessageHandler { +func (t *OTel) instrumentCallback(in courier.MessageHandler) courier.MessageHandler { if !t.tracePaths.match(traceCallback) { return in } return func(ctx context.Context, pubSub courier.PubSub, msg *courier.Message) { spanName := "UnknownSubscribeCallback" + pkgName := "Unknown" + fnName := "UnknownFunc" + if fnPtr := runtime.FuncForPC(reflect.ValueOf(in).Pointer()); fnPtr != nil { - spanName = fnPtr.Name() + fullName := fnPtr.Name() + + if matches := runtimeCallbackExtractor.FindStringSubmatch(fullName); len(matches) > 0 { + spanName = matches[fnIndex] + pkgName = fmt.Sprintf("%s/%s", matches[moduleIndex], matches[pkgIndex]) + fnName = matches[fnIndex] + } + } + + attrs := []attribute.KeyValue{ + semconv.ServiceNameKey.String(t.service), + semconv.CodeNamespace(pkgName), + semconv.CodeFunction(fnName), + MQTTQoS.Int(int(msg.QoS)), + MQTTRetained.Bool(msg.Retained), } if t.textMapCarrierFunc != nil { ctx = t.propagator.Extract(ctx, t.textMapCarrierFunc(ctx)) } - ctx, span := t.tracer.Start(ctx, spanName) + metricAttrs := metric.WithAttributes(append(attrs, + MQTTTopic.String(t.topicTransformer(ctx, msg.Topic)), + )...) + + defer func(ctx context.Context, now time.Time, attrs metric.MeasurementOption) { + t.rc.recordLatency(ctx, traceCallback, time.Since(now), attrs) + }(ctx, t.tnow(), metricAttrs) + + ctx, span := t.tracer.Start(ctx, spanName, trace.WithAttributes(append(attrs, MQTTTopic.String(msg.Topic))...)) defer span.End() + t.rc.incAttempt(ctx, traceCallback, metricAttrs) + in(ctx, pubSub, msg) + + if ros, ok := span.(tracesdk.ReadOnlySpan); ok && ros.Status().Code == codes.Error { + t.rc.incFailure(ctx, traceCallback, metricAttrs) + } } } diff --git a/otelcourier/subscribe_test.go b/otelcourier/subscribe_test.go index 47582b6..9a67dc0 100644 --- a/otelcourier/subscribe_test.go +++ b/otelcourier/subscribe_test.go @@ -1,22 +1,29 @@ package otelcourier import ( + "bytes" "context" "errors" + "fmt" "reflect" + "strings" "testing" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" oteltrace "go.opentelemetry.io/otel/trace" - courier "github.com/gojek/courier-go" + "github.com/gojek/courier-go" ) type traceparent int @@ -28,7 +35,12 @@ func TestSubscribeTraceSpan(t *testing.T) { sr := tracetest.NewSpanRecorder() tp.RegisterSpanProcessor(sr) - mwf := NewTracer("test-service", WithTracerProvider(tp)) + reg := prom.NewRegistry() + exporter, err := prometheus.New(prometheus.WithRegisterer(reg)) + require.NoError(t, err) + mp := metric.NewMeterProvider(metric.WithReader(exporter)) + + mwf := New("test-service", WithTracerProvider(tp), WithMeterProvider(mp)) uErr := errors.New("error_from_upstream") u := mwf.SubscriberMiddleware(courier.NewSubscriberFuncs( @@ -40,11 +52,10 @@ func TestSubscribeTraceSpan(t *testing.T) { }, )) - err := u.Subscribe(context.Background(), "test-topic", + assert.EqualError(t, u.Subscribe(context.Background(), "test-topic", func(_ context.Context, _ courier.PubSub, _ *courier.Message) {}, courier.QOSOne, - ) - assert.EqualError(t, err, uErr.Error()) + ), uErr.Error()) spans := sr.Ended() require.Len(t, spans, 1) @@ -76,6 +87,38 @@ func TestSubscribeTraceSpan(t *testing.T) { assert.Equal(t, codes.Error, span.Status().Code) assert.Equal(t, subscribeErrMessage, span.Status().Description) }) + + t.Run("Metrics", func(t *testing.T) { + vsn := courier.Version() + buf := bytes.NewBufferString(fmt.Sprintf(`# HELP courier_subscribe_attempts_total Number of subscribe attempts +# TYPE courier_subscribe_attempts_total counter +courier_subscribe_attempts_total{mqtt_qos="1",mqtt_topic="test-topic",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +# HELP courier_subscribe_failures_total Number of subscribe failures +# TYPE courier_subscribe_failures_total counter +courier_subscribe_failures_total{mqtt_qos="1",mqtt_topic="test-topic",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +`, vsn, vsn)) + assert.NoError(t, testutil.GatherAndCompare(reg, buf, + "courier_subscribe_attempts_total", + "courier_subscribe_failures_total", + )) + + metrics, err := reg.Gather() + assert.NoError(t, err) + + var found bool + for _, metricFamily := range metrics { + if metricFamily.GetName() != "courier_subscribe_latency_seconds" { + continue + } + found = true + + for _, m := range metricFamily.GetMetric() { + assert.EqualValues(t, 1, m.GetHistogram().GetSampleCount()) + } + } + + assert.True(t, found) + }) } func TestSubscribeMultipleTraceSpan(t *testing.T) { @@ -83,7 +126,12 @@ func TestSubscribeMultipleTraceSpan(t *testing.T) { sr := tracetest.NewSpanRecorder() tp.RegisterSpanProcessor(sr) - mwf := NewTracer("test-service", WithTracerProvider(tp)) + reg := prom.NewRegistry() + exporter, err := prometheus.New(prometheus.WithRegisterer(reg)) + require.NoError(t, err) + mp := metric.NewMeterProvider(metric.WithReader(exporter)) + + mwf := New("test-service", WithTracerProvider(tp), WithMeterProvider(mp)) uErr := errors.New("error_from_upstream") u := mwf.SubscriberMiddleware(courier.NewSubscriberFuncs( @@ -95,11 +143,10 @@ func TestSubscribeMultipleTraceSpan(t *testing.T) { }, )) - err := u.SubscribeMultiple(context.Background(), map[string]courier.QOSLevel{ + assert.EqualError(t, u.SubscribeMultiple(context.Background(), map[string]courier.QOSLevel{ "test-topic-1": courier.QOSOne, "test-topic-2": courier.QOSTwo, - }, func(_ context.Context, _ courier.PubSub, _ *courier.Message) {}) - assert.EqualError(t, err, uErr.Error()) + }, func(_ context.Context, _ courier.PubSub, _ *courier.Message) {}), uErr.Error()) spans := sr.Ended() require.Len(t, spans, 1) @@ -132,6 +179,47 @@ func TestSubscribeMultipleTraceSpan(t *testing.T) { assert.Equal(t, codes.Error, span.Status().Code) assert.Equal(t, subscribeMultipleErrMessage, span.Status().Description) }) + + t.Run("Metrics", func(t *testing.T) { + vsn := courier.Version() + buf := bytes.NewBufferString(fmt.Sprintf(`# HELP courier_subscribe_attempts_total Number of subscribe attempts +# TYPE courier_subscribe_attempts_total counter +courier_subscribe_attempts_total{mqtt_qos="1",mqtt_topic="test-topic-1",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +courier_subscribe_attempts_total{mqtt_qos="2",mqtt_topic="test-topic-2",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +# HELP courier_subscribe_failures_total Number of subscribe failures +# TYPE courier_subscribe_failures_total counter +courier_subscribe_failures_total{mqtt_qos="1",mqtt_topic="test-topic-1",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +courier_subscribe_failures_total{mqtt_qos="2",mqtt_topic="test-topic-2",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +`, vsn, vsn, vsn, vsn)) + assert.NoError(t, testutil.GatherAndCompare(reg, buf, + "courier_subscribe_attempts_total", + "courier_subscribe_failures_total", + )) + + metrics, err := reg.Gather() + assert.NoError(t, err) + + var found bool + for _, metricFamily := range metrics { + if metricFamily.GetName() != "courier_subscribe_latency_seconds" { + continue + } + found = true + + for _, m := range metricFamily.GetMetric() { + assert.EqualValues(t, 1, m.GetHistogram().GetSampleCount()) + + for _, labelPair := range m.GetLabel() { + assert.NotEqual(t, + strings.ReplaceAll(string(MQTTTopicWithQoS), ".", "_"), + labelPair.GetName(), + ) + } + } + } + + assert.True(t, found) + }) } func TestSubscriberSpanNotInstrumented(t *testing.T) { @@ -151,28 +239,90 @@ func Test_instrumentCallback(t *testing.T) { sr := tracetest.NewSpanRecorder() tp.RegisterSpanProcessor(sr) + reg := prom.NewRegistry() + exporter, err := prometheus.New(prometheus.WithRegisterer(reg)) + require.NoError(t, err) + mp := metric.NewMeterProvider(metric.WithReader(exporter)) + extFn := func(ctx context.Context) propagation.TextMapCarrier { return ctx.Value(traceparentKey).(propagation.TextMapCarrier) } - m := NewTracer("test-service", + m := New("test-service", WithTracerProvider(tp), + WithMeterProvider(mp), WithTextMapPropagator(propagation.NewCompositeTextMapPropagator(&propagation.TraceContext{})), WithTextMapCarrierExtractFunc(extFn), ) - callback := m.instrumentCallback(func(_ context.Context, _ courier.PubSub, _ *courier.Message) {}) + callback := m.instrumentCallback(func(ctx context.Context, _ courier.PubSub, _ *courier.Message) { + span := oteltrace.SpanFromContext(ctx) + + span.SetAttributes(attribute.String("test-attr", "test-value")) + span.RecordError(errors.New("test-error")) + span.SetStatus(codes.Error, "test-error-msg") + }) c, _ := courier.NewClient() callback(context.WithValue(context.Background(), traceparentKey, &propagation.MapCarrier{ "traceparent": "00-c8e801456e8232f618c49c6f65f101db-1986c136102242cd-01", - }), c, &courier.Message{}) + }), c, &courier.Message{ + Topic: "test-topic", + QoS: courier.QOSOne, + Retained: true, + }) spans := sr.Ended() require.Len(t, spans, 1) span := spans[0] - assert.Equal(t, "c8e801456e8232f618c49c6f65f101db", span.SpanContext().TraceID().String()) - assert.Equal(t, "github.com/gojek/courier-go/otelcourier.Test_instrumentCallback.func2", span.Name()) + + t.Run("Span", func(t *testing.T) { + assert.Equal(t, "c8e801456e8232f618c49c6f65f101db", span.SpanContext().TraceID().String()) + assert.Equal(t, "Test_instrumentCallback.func2", span.Name()) + }) + + t.Run("Attributes", func(t *testing.T) { + got := attribute.NewSet(span.Attributes()...) + expected := attribute.NewSet([]attribute.KeyValue{ + MQTTTopic.String("test-topic"), + MQTTQoS.Int(int(courier.QOSOne)), + MQTTRetained.Bool(true), + semconv.ServiceNameKey.String("test-service"), + semconv.CodeFunction("Test_instrumentCallback.func2"), + semconv.CodeNamespace("github.com/gojek/courier-go/otelcourier"), + attribute.String("test-attr", "test-value"), + }...) + + assert.Equal(t, expected, got) + }) + + t.Run("Events", func(t *testing.T) { + got := attribute.NewSet(span.Events()[0].Attributes...) + expected := attribute.NewSet([]attribute.KeyValue{ + semconv.ExceptionMessageKey.String("test-error"), + semconv.ExceptionTypeKey.String(reflect.TypeOf(errors.New("test-error")).String()), + }...) + + assert.Equal(t, expected, got) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, "test-error-msg", span.Status().Description) + }) + + t.Run("Metrics", func(t *testing.T) { + vsn := courier.Version() + buf := bytes.NewBufferString(fmt.Sprintf(`# HELP courier_subscribe_callback_attempts_total Number of subscribe.callback attempts +# TYPE courier_subscribe_callback_attempts_total counter +courier_subscribe_callback_attempts_total{code_function="Test_instrumentCallback.func2",code_namespace="github.com/gojek/courier-go/otelcourier",mqtt_qos="1",mqtt_retained="true",mqtt_topic="test-topic",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +# HELP courier_subscribe_callback_failures_total Number of subscribe.callback failures +# TYPE courier_subscribe_callback_failures_total counter +courier_subscribe_callback_failures_total{code_function="Test_instrumentCallback.func2",code_namespace="github.com/gojek/courier-go/otelcourier",mqtt_qos="1",mqtt_retained="true",mqtt_topic="test-topic",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +`, vsn, vsn)) + + assert.NoError(t, testutil.GatherAndCompare(reg, buf, + "courier_subscribe_callback_attempts_total", + "courier_subscribe_callback_failures_total", + )) + }) } func Test_instrumentCallbackDisabled(t *testing.T) { @@ -180,7 +330,7 @@ func Test_instrumentCallbackDisabled(t *testing.T) { sr := tracetest.NewSpanRecorder() tp.RegisterSpanProcessor(sr) - m := NewTracer("test-service", WithTracerProvider(tp), DisableCallbackTracing) + m := New("test-service", WithTracerProvider(tp), DisableCallbackTracing) callback := m.instrumentCallback(func(_ context.Context, _ courier.PubSub, _ *courier.Message) {}) diff --git a/otelcourier/trace.go b/otelcourier/trace.go deleted file mode 100644 index 23939f9..0000000 --- a/otelcourier/trace.go +++ /dev/null @@ -1,61 +0,0 @@ -package otelcourier - -import ( - "context" - - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/trace" - - courier "github.com/gojek/courier-go" -) - -const ( - tracerName = "github.com/gojek/courier-go/otelcourier" -) - -// Tracer implements tracing abilities using OpenTelemetry SDK. -type Tracer struct { - service string - tracePaths tracePath - tracer trace.Tracer - propagator propagation.TextMapPropagator - textMapCarrierFunc func(context.Context) propagation.TextMapCarrier -} - -// NewTracer creates a new Tracer with Option(s). -func NewTracer(service string, opts ...Option) *Tracer { - to := defaultOptions() - - for _, opt := range opts { - opt(to) - } - - tracer := to.tracerProvider.Tracer( - tracerName, - trace.WithInstrumentationVersion("semver:"+courier.Version()), - ) - - return &Tracer{ - service: service, - tracer: tracer, - propagator: to.propagator, - textMapCarrierFunc: to.textMapCarrierExtractor, - tracePaths: to.tracePaths, - } -} - -// ApplyTraceMiddlewares will instrument all the operations of a courier.Client instance -// according to Option(s) used. -func (t *Tracer) ApplyTraceMiddlewares(c *courier.Client) { - if t.tracePaths.match(tracePublisher) { - c.UsePublisherMiddleware(t.PublisherMiddleware) - } - - if t.tracePaths.match(traceSubscriber) { - c.UseSubscriberMiddleware(t.SubscriberMiddleware) - } - - if t.tracePaths.match(traceUnsubscriber) { - c.UseUnsubscriberMiddleware(t.UnsubscriberMiddleware) - } -} diff --git a/otelcourier/unsubscribe.go b/otelcourier/unsubscribe.go index 57f864c..0048860 100644 --- a/otelcourier/unsubscribe.go +++ b/otelcourier/unsubscribe.go @@ -2,9 +2,11 @@ package otelcourier import ( "context" + "time" "go.opentelemetry.io/otel/codes" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "go.opentelemetry.io/otel/trace" courier "github.com/gojek/courier-go" @@ -16,20 +18,43 @@ const ( ) // UnsubscriberMiddleware is a courier.UnsubscriberMiddlewareFunc for tracing unsubscribe calls. -func (t *Tracer) UnsubscriberMiddleware(next courier.Unsubscriber) courier.Unsubscriber { +func (t *OTel) UnsubscriberMiddleware(next courier.Unsubscriber) courier.Unsubscriber { return courier.UnsubscriberFunc(func(ctx context.Context, topics ...string) error { - opts := []trace.SpanStartOption{ - trace.WithAttributes(MQTTTopic.StringSlice(topics)), - trace.WithAttributes(semconv.ServiceNameKey.String(t.service)), - trace.WithSpanKind(trace.SpanKindClient), + unnestMetricAttrs := make([]metric.MeasurementOption, 0, len(topics)) + for _, topic := range topics { + unnestMetricAttrs = append(unnestMetricAttrs, metric.WithAttributes( + semconv.ServiceNameKey.String(t.service), + MQTTTopic.String(t.topicTransformer(ctx, topic)), + )) } - ctx, span := t.tracer.Start(ctx, unsubscribeSpanName, opts...) + + defer func(ctx context.Context, now time.Time, attrs ...metric.MeasurementOption) { + for _, attr := range attrs { + t.rc.recordLatency(ctx, traceUnsubscriber, time.Since(now), attr) + } + }(ctx, t.tnow(), unnestMetricAttrs...) + + ctx, span := t.tracer.Start(ctx, unsubscribeSpanName, + trace.WithAttributes( + semconv.ServiceNameKey.String(t.service), + MQTTTopic.StringSlice(topics), + ), + trace.WithSpanKind(trace.SpanKindClient), + ) defer span.End() + for _, attr := range unnestMetricAttrs { + t.rc.incAttempt(ctx, traceUnsubscriber, attr) + } + err := next.Unsubscribe(ctx, topics...) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, unsubscribeErrMessage) + + for _, attr := range unnestMetricAttrs { + t.rc.incFailure(ctx, traceUnsubscriber, attr) + } } return err diff --git a/otelcourier/unsubscribe_test.go b/otelcourier/unsubscribe_test.go index c0dd2a7..3088b37 100644 --- a/otelcourier/unsubscribe_test.go +++ b/otelcourier/unsubscribe_test.go @@ -1,21 +1,28 @@ package otelcourier import ( + "bytes" "context" "errors" + "fmt" "reflect" + "strings" "testing" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" oteltrace "go.opentelemetry.io/otel/trace" - courier "github.com/gojek/courier-go" + "github.com/gojek/courier-go" ) func TestUnsubscriberTraceSpan(t *testing.T) { @@ -23,15 +30,19 @@ func TestUnsubscriberTraceSpan(t *testing.T) { sr := tracetest.NewSpanRecorder() tp.RegisterSpanProcessor(sr) - mwf := NewTracer("test-service", WithTracerProvider(tp)) + reg := prom.NewRegistry() + exporter, err := prometheus.New(prometheus.WithRegisterer(reg)) + require.NoError(t, err) + mp := metric.NewMeterProvider(metric.WithReader(exporter)) + + mwf := New("test-service", WithTracerProvider(tp), WithMeterProvider(mp)) uErr := errors.New("error_from_upstream") u := mwf.UnsubscriberMiddleware(courier.UnsubscriberFunc(func(ctx context.Context, topics ...string) error { return uErr })) - err := u.Unsubscribe(context.Background(), "test-topic-1", "test-topic-2") - assert.EqualError(t, err, uErr.Error()) + assert.EqualError(t, u.Unsubscribe(context.Background(), "test-topic-1", "test-topic-2"), uErr.Error()) spans := sr.Ended() require.Len(t, spans, 1) @@ -61,6 +72,48 @@ func TestUnsubscriberTraceSpan(t *testing.T) { assert.Equal(t, codes.Error, span.Status().Code) assert.Equal(t, unsubscribeErrMessage, span.Status().Description) }) + + t.Run("Metrics", func(t *testing.T) { + vsn := courier.Version() + buf := bytes.NewBufferString(fmt.Sprintf(`# HELP courier_unsubscribe_attempts_total Number of unsubscribe attempts +# TYPE courier_unsubscribe_attempts_total counter +courier_unsubscribe_attempts_total{mqtt_topic="test-topic-1",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +courier_unsubscribe_attempts_total{mqtt_topic="test-topic-2",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +# HELP courier_unsubscribe_failures_total Number of unsubscribe failures +# TYPE courier_unsubscribe_failures_total counter +courier_unsubscribe_failures_total{mqtt_topic="test-topic-1",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +courier_unsubscribe_failures_total{mqtt_topic="test-topic-2",otel_scope_name="github.com/gojek/courier-go/otelcourier",otel_scope_version="semver:%s",service_name="test-service"} 1 +`, vsn, vsn, vsn, vsn)) + assert.NoError(t, testutil.GatherAndCompare(reg, buf, + "courier_unsubscribe_attempts_total", + "courier_unsubscribe_failures_total", + )) + + metrics, err := reg.Gather() + assert.NoError(t, err) + + var found bool + for _, metricFamily := range metrics { + if metricFamily.GetName() != "courier_unsubscribe_latency_seconds" { + continue + } + found = true + + for _, m := range metricFamily.GetMetric() { + assert.EqualValues(t, 1, m.GetHistogram().GetSampleCount()) + + for _, labelPair := range m.GetLabel() { + // assert that MQTTTopic is not a slice, i.e. MQTTTopic.StringSlice(topics) + if lpName := strings.ReplaceAll(string(MQTTTopic), ".", "_"); labelPair.GetName() == lpName { + assert.False(t, strings.HasPrefix(labelPair.GetValue(), "[")) + assert.False(t, strings.HasSuffix(labelPair.GetValue(), "]")) + } + } + } + } + + assert.True(t, found) + }) } func TestUnsubscriberSpanNotInstrumented(t *testing.T) {