diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cb2b592746..9f0e0943382 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520) +### Added + +- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524) + ## [1.11.2/0.34.0] 2022-12-05 ### Added diff --git a/sdk/metric/config.go b/sdk/metric/config.go index c78b0416415..c837df8b76f 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -54,14 +54,19 @@ func unify(funcs []func(context.Context) error) func(context.Context) error { errs = append(errs, err) } } - switch len(errs) { - case 0: - return nil - case 1: - return errs[0] - default: - return fmt.Errorf("%v", errs) - } + return unifyErrors(errs) + } +} + +// unifyErrors combines multiple errors into a single error. +func unifyErrors(errs []error) error { + switch len(errs) { + case 0: + return nil + case 1: + return errs[0] + default: + return fmt.Errorf("%v", errs) } } diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index a924d879d00..dc5eff2eee2 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -28,12 +28,13 @@ import ( ) type reader struct { - producer producer - temporalityFunc TemporalitySelector - aggregationFunc AggregationSelector - collectFunc func(context.Context) (metricdata.ResourceMetrics, error) - forceFlushFunc func(context.Context) error - shutdownFunc func(context.Context) error + producer sdkProducer + externalProducers []Producer + temporalityFunc TemporalitySelector + aggregationFunc AggregationSelector + collectFunc func(context.Context) (metricdata.ResourceMetrics, error) + forceFlushFunc func(context.Context) error + shutdownFunc func(context.Context) error } var _ Reader = (*reader)(nil) @@ -42,7 +43,8 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n return r.aggregationFunc(kind) } -func (r *reader) register(p producer) { r.producer = p } +func (r *reader) register(p sdkProducer) { r.producer = p } +func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.externalProducers, p) } func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality { return r.temporalityFunc(kind) } diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 0ebfadf33a3..48a8b291e77 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -28,9 +28,13 @@ import ( // manualReader is a simple Reader that allows an application to // read metrics on demand. type manualReader struct { - producer atomic.Value + sdkProducer atomic.Value shutdownOnce sync.Once + mu sync.Mutex + isShutdown bool + externalProducers atomic.Value + temporalitySelector TemporalitySelector aggregationSelector AggregationSelector } @@ -41,22 +45,39 @@ var _ = map[Reader]struct{}{&manualReader{}: {}} // NewManualReader returns a Reader which is directly called to collect metrics. func NewManualReader(opts ...ManualReaderOption) Reader { cfg := newManualReaderConfig(opts) - return &manualReader{ + r := &manualReader{ temporalitySelector: cfg.temporalitySelector, aggregationSelector: cfg.aggregationSelector, } + r.externalProducers.Store([]Producer{}) + return r } -// register stores the Producer which enables the caller to read -// metrics on demand. -func (mr *manualReader) register(p producer) { +// register stores the sdkProducer which enables the caller +// to read metrics from the SDK on demand. +func (mr *manualReader) register(p sdkProducer) { // Only register once. If producer is already set, do nothing. - if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { + if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { msg := "did not register manual reader" global.Error(errDuplicateRegister, msg) } } +// RegisterProducer stores the external Producer which enables the caller +// to read metrics on demand. +func (mr *manualReader) RegisterProducer(p Producer) { + mr.mu.Lock() + defer mr.mu.Unlock() + if mr.isShutdown { + return + } + currentProducers := mr.externalProducers.Load().([]Producer) + newProducers := []Producer{} + newProducers = append(newProducers, currentProducers...) + newProducers = append(newProducers, p) + mr.externalProducers.Store(newProducers) +} + // temporality reports the Temporality for the instrument kind provided. func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality { return mr.temporalitySelector(kind) @@ -77,18 +98,23 @@ func (mr *manualReader) Shutdown(context.Context) error { err := ErrReaderShutdown mr.shutdownOnce.Do(func() { // Any future call to Collect will now return ErrReaderShutdown. - mr.producer.Store(produceHolder{ + mr.sdkProducer.Store(produceHolder{ produce: shutdownProducer{}.produce, }) + mr.mu.Lock() + defer mr.mu.Unlock() + mr.isShutdown = true + // release references to Producer(s) + mr.externalProducers.Store([]Producer{}) err = nil }) return err } -// Collect gathers all metrics from the SDK, calling any callbacks necessary. -// Collect will return an error if called after shutdown. +// Collect gathers all metrics from the SDK and other Producers, calling any +// callbacks necessary. Collect will return an error if called after shutdown. func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - p := mr.producer.Load() + p := mr.sdkProducer.Load() if p == nil { return metricdata.ResourceMetrics{}, ErrReaderNotRegistered } @@ -103,7 +129,19 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics return metricdata.ResourceMetrics{}, err } - return ph.produce(ctx) + rm, err := ph.produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + var errs []error + for _, producer := range mr.externalProducers.Load().([]Producer) { + externalMetrics, err := producer.Produce(ctx) + if err != nil { + errs = append(errs, err) + } + rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) + } + return rm, unifyErrors(errs) } // manualReaderConfig contains configuration options for a ManualReader. diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 00ba1305595..8425e42e16a 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade cancel: cancel, done: make(chan struct{}), } + r.externalProducers.Store([]Producer{}) go func() { defer func() { close(r.done) }() @@ -126,7 +127,11 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade // periodicReader is a Reader that continuously collects and exports metric // data at a set interval. type periodicReader struct { - producer atomic.Value + sdkProducer atomic.Value + + mu sync.Mutex + isShutdown bool + externalProducers atomic.Value timeout time.Duration exporter Exporter @@ -166,14 +171,28 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) { } // register registers p as the producer of this reader. -func (r *periodicReader) register(p producer) { +func (r *periodicReader) register(p sdkProducer) { // Only register once. If producer is already set, do nothing. - if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { + if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { msg := "did not register periodic reader" global.Error(errDuplicateRegister, msg) } } +// RegisterProducer registers p as an external Producer of this reader. +func (r *periodicReader) RegisterProducer(p Producer) { + r.mu.Lock() + defer r.mu.Unlock() + if r.isShutdown { + return + } + currentProducers := r.externalProducers.Load().([]Producer) + newProducers := []Producer{} + newProducers = append(newProducers, currentProducers...) + newProducers = append(newProducers, p) + r.externalProducers.Store(newProducers) +} + // temporality reports the Temporality for the instrument kind provided. func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality { return r.exporter.Temporality(kind) @@ -195,12 +214,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error { } // Collect gathers and returns all metric data related to the Reader from -// the SDK. The returned metric data is not exported to the configured -// exporter, it is left to the caller to handle that if desired. +// the SDK and other Producers. The returned metric data is not exported +// to the configured exporter, it is left to the caller to handle that if +// desired. // // An error is returned if this is called after Shutdown. func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - return r.collect(ctx, r.producer.Load()) + return r.collect(ctx, r.sdkProducer.Load()) } // collect unwraps p as a produceHolder and returns its produce results. @@ -218,7 +238,20 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata err := fmt.Errorf("periodic reader: invalid producer: %T", p) return metricdata.ResourceMetrics{}, err } - return ph.produce(ctx) + + rm, err := ph.produce(ctx) + if err != nil { + return metricdata.ResourceMetrics{}, err + } + var errs []error + for _, producer := range r.externalProducers.Load().([]Producer) { + externalMetrics, err := producer.Produce(ctx) + if err != nil { + errs = append(errs, err) + } + rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) + } + return rm, unifyErrors(errs) } // export exports metric data m using r's exporter. @@ -259,7 +292,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error { <-r.done // Any future call to Collect will now return ErrReaderShutdown. - ph := r.producer.Swap(produceHolder{ + ph := r.sdkProducer.Swap(produceHolder{ produce: shutdownProducer{}.produce, }) @@ -276,6 +309,12 @@ func (r *periodicReader) Shutdown(ctx context.Context) error { if err == nil || err == ErrReaderShutdown { err = sErr } + + r.mu.Lock() + defer r.mu.Unlock() + r.isShutdown = true + // release references to Producer(s) + r.externalProducers.Store([]Producer{}) }) return err } diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index d48c1a7de8e..138aae48944 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -114,7 +114,8 @@ func (ts *periodicReaderTestSuite) SetupTest() { } ts.ErrReader = NewPeriodicReader(e) - ts.ErrReader.register(testProducer{}) + ts.ErrReader.register(testSDKProducer{}) + ts.ErrReader.RegisterProducer(testExternalProducer{}) } func (ts *periodicReaderTestSuite) TearDownTest() { @@ -186,14 +187,15 @@ func TestPeriodicReaderRun(t *testing.T) { exp := &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { - // The testProducer produces testMetrics. - assert.Equal(t, testMetrics, m) + // The testSDKProducer produces testResourceMetricsAB. + assert.Equal(t, testResourceMetricsAB, m) return assert.AnError }, } r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.register(testSDKProducer{}) + r.RegisterProducer(testExternalProducer{}) trigger <- time.Now() assert.Equal(t, assert.AnError, <-eh.Err) @@ -210,8 +212,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { called = new(bool) return &fnExporter{ exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error { - // The testProducer produces testMetrics. - assert.Equal(t, testMetrics, m) + // The testSDKProducer produces testResourceMetricsA. + assert.Equal(t, testResourceMetricsAB, m) *called = true return assert.AnError }, @@ -221,7 +223,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush", func(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.register(testSDKProducer{}) + r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") @@ -232,7 +235,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown", func(t *testing.T) { exp, called := expFunc(t) r := NewPeriodicReader(exp) - r.register(testProducer{}) + r.register(testSDKProducer{}) + r.RegisterProducer(testExternalProducer{}) assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") }) diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index aa9d50ef666..c52cc58dff2 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -51,7 +51,12 @@ type Reader interface { // register registers a Reader with a MeterProvider. // The producer argument allows the Reader to signal the sdk to collect // and send aggregated metric measurements. - register(producer) + register(sdkProducer) + + // RegisterProducer registers a an external Producer with this Reader. + // The Producer is used as a source of aggregated metric data which is + // incorporated into metrics collected from the SDK. + RegisterProducer(Producer) // temporality reports the Temporality for the instrument kind provided. temporality(InstrumentKind) metricdata.Temporality @@ -84,14 +89,22 @@ type Reader interface { Shutdown(context.Context) error } -// producer produces metrics for a Reader. -type producer interface { +// sdkProducer produces metrics for a Reader. +type sdkProducer interface { // produce returns aggregated metrics from a single collection. // // This method is safe to call concurrently. produce(context.Context) (metricdata.ResourceMetrics, error) } +// Producer produces metrics for a Reader from an external source. +type Producer interface { + // Produce returns aggregated metrics from an external source. + // + // This method should be safe to call concurrently. + Produce(context.Context) ([]metricdata.ScopeMetrics, error) +} + // produceHolder is used as an atomic.Value to wrap the non-concrete producer // type. type produceHolder struct { diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 28b249bd3e2..191ab39945b 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -57,16 +57,25 @@ func (ts *readerTestSuite) TestErrorForNotRegistered() { ts.ErrorIs(err, ErrReaderNotRegistered) } -func (ts *readerTestSuite) TestProducer() { - ts.Reader.register(testProducer{}) +func (ts *readerTestSuite) TestSDKProducer() { + ts.Reader.register(testSDKProducer{}) m, err := ts.Reader.Collect(context.Background()) ts.NoError(err) - ts.Equal(testMetrics, m) + ts.Equal(testResourceMetricsA, m) +} + +func (ts *readerTestSuite) TestExternalProducer() { + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) + m, err := ts.Reader.Collect(context.Background()) + ts.NoError(err) + ts.Equal(testResourceMetricsAB, m) } func (ts *readerTestSuite) TestCollectAfterShutdown() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) m, err := ts.Reader.Collect(ctx) @@ -76,27 +85,29 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() { func (ts *readerTestSuite) TestShutdownTwice() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown) } func (ts *readerTestSuite) TestMultipleForceFlush() { ctx := context.Background() - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.ForceFlush(ctx)) ts.NoError(ts.Reader.ForceFlush(ctx)) } func (ts *readerTestSuite) TestMultipleRegister() { - p0 := testProducer{ + p0 := testSDKProducer{ produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) { // Differentiate this producer from the second by returning an // error. - return testMetrics, assert.AnError + return testResourceMetricsA, assert.AnError }, } - p1 := testProducer{} + p1 := testSDKProducer{} ts.Reader.register(p0) // This should be ignored. @@ -106,11 +117,46 @@ func (ts *readerTestSuite) TestMultipleRegister() { ts.Equal(assert.AnError, err) } +func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer( + testExternalProducer{ + produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + return []metricdata.ScopeMetrics{}, assert.AnError + }, + }, + ) + ts.Reader.RegisterProducer( + testExternalProducer{ + produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + return []metricdata.ScopeMetrics{testScopeMetricsB}, nil + }, + }, + ) + + m, err := ts.Reader.Collect(context.Background()) + ts.Equal(assert.AnError, err) + ts.Equal(testResourceMetricsAB, m) +} + +func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() { + ts.Reader.register(testSDKProducer{ + produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) { + return metricdata.ResourceMetrics{}, assert.AnError + }}) + ts.Reader.RegisterProducer(testExternalProducer{}) + + m, err := ts.Reader.Collect(context.Background()) + ts.Equal(assert.AnError, err) + ts.Equal(metricdata.ResourceMetrics{}, m) +} + func (ts *readerTestSuite) TestMethodConcurrency() { // Requires the race-detector (a default test option for the project). // All reader methods should be concurrent-safe. - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) ctx := context.Background() var wg sync.WaitGroup @@ -141,49 +187,85 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { ctx := context.Background() ts.Require().NoError(ts.Reader.Shutdown(ctx)) // Registering after shutdown should not revert the shutdown. - ts.Reader.register(testProducer{}) + ts.Reader.register(testSDKProducer{}) + ts.Reader.RegisterProducer(testExternalProducer{}) m, err := ts.Reader.Collect(ctx) ts.ErrorIs(err, ErrReaderShutdown) ts.Equal(metricdata.ResourceMetrics{}, m) } -var testMetrics = metricdata.ResourceMetrics{ - Resource: resource.NewSchemaless(attribute.String("test", "Reader")), - ScopeMetrics: []metricdata.ScopeMetrics{{ - Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"}, - Metrics: []metricdata.Metrics{{ - Name: "fake data", - Description: "Data used to test a reader", - Unit: unit.Dimensionless, - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{{ - Attributes: attribute.NewSet(attribute.String("user", "alice")), - StartTime: time.Now(), - Time: time.Now().Add(time.Second), - Value: -1, - }}, - }, - }}, +var testScopeMetricsA = metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"}, + Metrics: []metricdata.Metrics{{ + Name: "fake data", + Description: "Data used to test a reader", + Unit: unit.Dimensionless, + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("user", "alice")), + StartTime: time.Now(), + Time: time.Now().Add(time.Second), + Value: -1, + }}, + }, + }}, +} + +var testScopeMetricsB = metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/external"}, + Metrics: []metricdata.Metrics{{ + Name: "fake scope data", + Description: "Data used to test a Producer reader", + Unit: unit.Milliseconds, + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{{ + Attributes: attribute.NewSet(attribute.String("user", "ben")), + StartTime: time.Now(), + Time: time.Now().Add(time.Second), + Value: 10, + }}, + }, }}, } -type testProducer struct { +var testResourceMetricsA = metricdata.ResourceMetrics{ + Resource: resource.NewSchemaless(attribute.String("test", "Reader")), + ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA}, +} + +var testResourceMetricsAB = metricdata.ResourceMetrics{ + Resource: resource.NewSchemaless(attribute.String("test", "Reader")), + ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA, testScopeMetricsB}, +} + +type testSDKProducer struct { produceFunc func(context.Context) (metricdata.ResourceMetrics, error) } -func (p testProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { +func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { + if p.produceFunc != nil { + return p.produceFunc(ctx) + } + return testResourceMetricsA, nil +} + +type testExternalProducer struct { + produceFunc func(context.Context) ([]metricdata.ScopeMetrics, error) +} + +func (p testExternalProducer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { if p.produceFunc != nil { return p.produceFunc(ctx) } - return testMetrics, nil + return []metricdata.ScopeMetrics{testScopeMetricsB}, nil } func benchReaderCollectFunc(r Reader) func(*testing.B) { ctx := context.Background() - r.register(testProducer{}) + r.register(testSDKProducer{}) // Store bechmark results in a closure to prevent the compiler from // inlining and skipping the function. @@ -198,7 +280,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) { for n := 0; n < b.N; n++ { collectedMetrics, err = r.Collect(ctx) - assert.Equalf(b, testMetrics, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err) + assert.Equalf(b, testResourceMetricsA, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err) } } }