Skip to content

Commit

Permalink
paralellize exports [goreleaser]
Browse files Browse the repository at this point in the history
  • Loading branch information
trajan0x committed Sep 11, 2024
1 parent 732f5fa commit 8a04f9e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 22 deletions.
4 changes: 3 additions & 1 deletion core/metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ The additional environment variables to note are:
| `OTEL_EXPORTER_OTLP_TRANSPORT_3` | The transport protocol for the third additional OTLP exporter | `http` |
| ... | Additional transports can be specified by incrementing the number | `http` |

<!-- TODO: fully document before pr merged-->
You can do the same thing for `OTEL_EXPORTER_OTLP_SECURE_MODE` and `OTEL_EXPORTER_OTLP_HEADERS`

<!-- TODO: fully document these optins-->

Note: The OTLP exporter endpoints and transports can be specified for multiple exporters by using incrementing numbers (1, 2, 3, etc.) in the environment variable names. This allows for configuration of multiple OTLP exporters. The primary exporter uses the base names without numbers.

Expand Down
54 changes: 40 additions & 14 deletions core/metrics/multiexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package metrics
import (
"context"
"fmt"
"go.uber.org/multierr"
"sync"
"time"

"go.opentelemetry.io/otel/sdk/trace"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
)

// MultiExporter is an interface that allows exporting spans to multiple OTLP trace exporters.
Expand All @@ -27,31 +29,55 @@ func NewMultiExporter(exporters ...trace.SpanExporter) MultiExporter {
}
}

const defaultTimeout = 30 * time.Second

// ExportSpans exports a batch of spans.
func (m *multiExporter) ExportSpans(ctx context.Context, ss []trace.ReadOnlySpan) error {
func (m *multiExporter) ExportSpans(parentCtx context.Context, ss []trace.ReadOnlySpan) error {
return m.doParallel(parentCtx, func(ctx context.Context, exporter trace.SpanExporter) error {
return exporter.ExportSpans(ctx, ss)
})
}

func (m *multiExporter) doParallel(parentCtx context.Context, fn func(context.Context, trace.SpanExporter) error) error {
ctx, cancel := context.WithTimeout(parentCtx, defaultTimeout)
defer cancel()

var wg sync.WaitGroup
var errors []error
var mu sync.Mutex

wg.Add(len(m.exporters))
for _, exporter := range m.exporters {

Check warning on line 50 in core/metrics/multiexporter.go

View check run for this annotation

Codecov / codecov/patch

core/metrics/multiexporter.go#L50

Added line #L50 was not covered by tests
err := exporter.ExportSpans(ctx, ss)
if err != nil {
return fmt.Errorf("could not export spans: %w", err)
}
go func(exporter trace.SpanExporter) {
defer wg.Done()
err := fn(ctx, exporter)
if err != nil {
mu.Lock()
errors = append(errors, fmt.Errorf("error in doMultiple: %w", err))
mu.Unlock()
}
}(exporter)
}

wg.Wait()
if len(errors) > 0 {
// nolint: wrapcheck
return multierr.Combine(errors...)
}

return nil

Check warning on line 68 in core/metrics/multiexporter.go

View check run for this annotation

Codecov / codecov/patch

core/metrics/multiexporter.go#L68

Added line #L68 was not covered by tests
}

// Shutdown notifies the exporter of a pending halt to operations.
func (m *multiExporter) Shutdown(ctx context.Context) error {
for _, exporter := range m.exporters {
err := exporter.Shutdown(ctx)
if err != nil {
return fmt.Errorf("could not stop exporter: %w", err)
}
}
return nil
return m.doParallel(ctx, func(ctx context.Context, exporter trace.SpanExporter) error {
return exporter.Shutdown(ctx)
})
}

// AddExporter adds an exporter to the multi exporter.
func (m *multiExporter) AddExporter(exporter trace.SpanExporter) {
m.exporters = append(m.exporters, exporter)
}

var _ tracesdk.SpanExporter = &multiExporter{}
var _ trace.SpanExporter = &multiExporter{}
15 changes: 8 additions & 7 deletions core/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (n *otlpHandler) Start(ctx context.Context) (err error) {

exporter, err := makeOTLPExporter(ctx, envSuffix)
if err != nil {
return fmt.Errorf("could not create exporter %d: %v", i, err)
return fmt.Errorf("could not create exporter %d: %w", i, err)
}

Check warning on line 55 in core/metrics/otlp.go

View check run for this annotation

Codecov / codecov/patch

core/metrics/otlp.go#L55

Added line #L55 was not covered by tests

exporters = append(exporters, exporter)
Expand Down Expand Up @@ -163,10 +163,10 @@ func makeOTLPExporter(ctx context.Context, envSuffix string) (*otlptrace.Exporte

oteltraceClient, err := buildClientFromTransport(
transport,
WithURL(url),
withURL(url),
// defaults to true
WithSecure(secure),
WithHeaders(headers),
withSecure(secure),
withHeaders(headers),
)
if err != nil {
return nil, fmt.Errorf("could not create client from transport: %w", err)
Expand Down Expand Up @@ -211,7 +211,7 @@ type transportOptions struct {
// only one will be used in creating the actual client.
type Option func(*transportOptions) error

func WithURL(url string) Option {
func withURL(url string) Option {
return func(o *transportOptions) error {
o.httpOptions = append(o.httpOptions, otlptracehttp.WithEndpointURL(url))
o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithEndpointURL(url))
Expand All @@ -220,14 +220,15 @@ func WithURL(url string) Option {
}
}

func WithSecure(secure bool) Option {
func withSecure(secure bool) Option {
return func(o *transportOptions) error {
if secure {
tlsCreds := credentials.NewClientTLSFromCert(nil, "")
// note: you do not need to specify the tls creds for http, this happens automatically when https:// is used as the protocol scheme.
o.grpcOptions = append(o.grpcOptions, otlptracegrpc.WithTLSCredentials(tlsCreds))

tlsConfig := &tls.Config{
MinVersion: tls.VersionTLS12,
// RootCAs is nil, which means the default system root CAs are used
}
o.httpOptions = append(o.httpOptions, otlptracehttp.WithTLSClientConfig(tlsConfig))
Expand All @@ -240,7 +241,7 @@ func WithSecure(secure bool) Option {
}
}

func WithHeaders(headers string) Option {
func withHeaders(headers string) Option {
return func(o *transportOptions) error {
realHeaders := headersToMap(headers)
o.httpOptions = append(o.httpOptions, otlptracehttp.WithHeaders(realHeaders))
Expand Down

0 comments on commit 8a04f9e

Please sign in to comment.