Skip to content

Commit

Permalink
Combine spatially aggregated precomputed vals
Browse files Browse the repository at this point in the history
Fix open-telemetry#3439

When an attribute filter drops a distinguishing attribute during the
aggregation of a precomputed sum add that value to existing, instead of
just setting the value as an override (current behavior).
  • Loading branch information
MrAlias committed Dec 19, 2022
1 parent a724cf8 commit 948932a
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 38 deletions.
31 changes: 27 additions & 4 deletions sdk/metric/internal/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,43 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

type filteredSet struct {
filtered bool
attrs attribute.Set
}

// filter is an aggregator that applies attribute filter when Aggregating. filters
// do not have any backing memory, and must be constructed with a backing Aggregator.
type filter[N int64 | float64] struct {
filter attribute.Filter
aggregator Aggregator[N]

// Used to aggreagte if an aggregator aggregates values differently for
// spatically reaggregated attributes.
filtered func(N, attribute.Set)

sync.Mutex
seen map[attribute.Set]attribute.Set
seen map[attribute.Set]filteredSet
}

// NewFilter wraps an Aggregator with an attribute filtering function.
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] {
if fn == nil {
return agg
}
af, ok := agg.(interface{ aggregateFiltered(N, attribute.Set) })
if ok {
return &filter[N]{
filter: fn,
aggregator: agg,
filtered: af.aggregateFiltered,
seen: make(map[attribute.Set]filteredSet),
}
}
return &filter[N]{
filter: fn,
aggregator: agg,
seen: map[attribute.Set]attribute.Set{},
seen: make(map[attribute.Set]filteredSet),
}
}

Expand All @@ -51,10 +69,15 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
a, na := attr.Filter(f.filter)
fAttr = filteredSet{filtered: len(na) != 0, attrs: a}
f.seen[attr] = fAttr
}
f.aggregator.Aggregate(measurement, fAttr)
if fAttr.filtered && f.filtered != nil {
f.filtered(measurement, fAttr.attrs)
} else {
f.aggregator.Aggregate(measurement, fAttr.attrs)
}
}

// Aggregation returns an Aggregation, for all the aggregated
Expand Down
12 changes: 12 additions & 0 deletions sdk/metric/internal/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ func (s *precomputedDeltaSum[N]) Aggregate(value N, attr attribute.Set) {
s.Unlock()
}

// aggregateFiltered records value with spatially re-aggregated attrs.
func (s *precomputedDeltaSum[N]) aggregateFiltered(value N, attr attribute.Set) {
s.Lock()
s.recorded[attr] += value
s.Unlock()
}

func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -260,3 +267,8 @@ type precomputedSum[N int64 | float64] struct {
func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) {
s.set(value, attr)
}

// aggregateFiltered records value with spatially re-aggregated attrs.
func (s *precomputedSum[N]) aggregateFiltered(value N, attr attribute.Set) {
s.valueMap.Aggregate(value, attr)
}
77 changes: 43 additions & 34 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,11 @@ func TestRegisterCallbackDropAggregations(t *testing.T) {
}

func TestAttributeFilter(t *testing.T) {
t.Run("Delta", testAttributeFilter(metricdata.DeltaTemporality))
t.Run("Cumulative", testAttributeFilter(metricdata.CumulativeTemporality))
}

func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) {
one := 1.0
two := 2.0
testcases := []struct {
Expand All @@ -650,10 +655,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 2.0, // TODO (#3439): This should be 3.0.
Value: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
Expand All @@ -677,10 +682,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 2.0, // TODO (#3439): This should be 3.0.
Value: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
Expand Down Expand Up @@ -729,10 +734,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 20, // TODO (#3439): This should be 30.
Value: 30,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
Expand All @@ -756,10 +761,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 20, // TODO (#3439): This should be 30.
Value: 30,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
Expand Down Expand Up @@ -810,7 +815,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
Expand All @@ -836,7 +841,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
Expand Down Expand Up @@ -867,7 +872,7 @@ func TestAttributeFilter(t *testing.T) {
Sum: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
},
},
},
Expand All @@ -892,7 +897,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 30,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
Expand All @@ -918,7 +923,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 30,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
Expand Down Expand Up @@ -949,34 +954,38 @@ func TestAttributeFilter(t *testing.T) {
Sum: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
},
},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
rdr := NewManualReader()
mtr := NewMeterProvider(
WithReader(rdr),
WithView(NewView(
Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("foo")
}},
)),
).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr))

m, err := rdr.Collect(context.Background())
assert.NoError(t, err)
return func(t *testing.T) {
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
rdr := NewManualReader(WithTemporalitySelector(func(InstrumentKind) metricdata.Temporality {
return temporality
}))
mtr := NewMeterProvider(
WithReader(rdr),
WithView(NewView(
Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("foo")
}},
)),
).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr))

m, err := rdr.Collect(context.Background())
assert.NoError(t, err)

require.Len(t, m.ScopeMetrics, 1)
require.Len(t, m.ScopeMetrics[0].Metrics, 1)
require.Len(t, m.ScopeMetrics, 1)
require.Len(t, m.ScopeMetrics[0].Metrics, 1)

metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp())
})
metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp())
})
}
}
}

Expand Down

0 comments on commit 948932a

Please sign in to comment.