Skip to content

Commit

Permalink
Clarify naming and documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Jan 14, 2023
1 parent f295494 commit 38cd7f4
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 50 deletions.
23 changes: 21 additions & 2 deletions sdk/metric/internal/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ var now = time.Now

// Aggregator forms an aggregation from a collection of recorded measurements.
//
// Aggregators need to be comparable so they can be de-duplicated by the SDK when
// it creates them for multiple views.
// Aggregators need to be comparable so they can be de-duplicated by the SDK
// when it creates them for multiple views.
type Aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
Expand All @@ -38,3 +38,22 @@ type Aggregator[N int64 | float64] interface {
// measurements made and ends an aggregation cycle.
Aggregation() metricdata.Aggregation
}

// precomputeAggregator is an Aggregator that recieves values to aggregate that
// have been pre-computed by the caller.
type precomputeAggregator[N int64 | float64] interface {
// The Aggregate method of the embedded Aggregator is used to record
// pre-computed measurements, scoped by attributes that have not been
// filtered by an attribute filter.
Aggregator[N]

// aggregateFiltered records measurements scoped by attributes that have
// been filtered by an attribute filter.
//
// Pre-computed measurements of filtered attributes need to be recorded
// separate from those that haven't been filtered so they can be added to
// the non-filtered pre-computed measurements in a collection cycle and
// then resest after the cycle (the non-filtered pre-computed measurements
// are not reset).
aggregateFiltered(N, attribute.Set)
}
51 changes: 33 additions & 18 deletions sdk/metric/internal/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

type filterAgg[N int64 | float64] interface {
Aggregator[N]

// filtered records values for attributes that have been filtered.
filtered(N, attribute.Set)
}

// NewFilter wraps an Aggregator with an attribute filtering function.
// NewFilter returns an Aggregator that wraps an agg with an attribute
// filtering function. Both pre-computed non-pre-computed Aggregators can be
// passed for agg. An appropriate Aggregator will be returned for the detected
// type.
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] {
if fn == nil {
return agg
}
if fa, ok := agg.(filterAgg[N]); ok {
if fa, ok := agg.(precomputeAggregator[N]); ok {
return newPrecomputedFilter(fa, fn)
}
return newFilter(agg, fn)
}

// filter is an aggregator that applies attribute filter when Aggregating. filters
// do not have any backing memory, and must be constructed with a backing Aggregator.
// filter wraps an aggregator with an attribute filter. All recorded
// measurements will have their attributes filtered before they are passed to
// the underlying aggregator's Aggregate method.
//
// This should not be used to wrap a pre-computed Aggregator. Use a
// precomputedFilter instead.
type filter[N int64 | float64] struct {
filter attribute.Filter
aggregator Aggregator[N]
Expand All @@ -49,6 +49,11 @@ type filter[N int64 | float64] struct {
seen map[attribute.Set]attribute.Set
}

// newFilter returns an filter Aggregator that wraps agg with the attribute
// filter fn.
//
// This should not be used to wrap a pre-computed Aggregator. Use a
// precomputedFilter instead.
func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filter[N] {
return &filter[N]{
filter: fn,
Expand Down Expand Up @@ -78,25 +83,33 @@ func (f *filter[N]) Aggregation() metricdata.Aggregation {
}

// precomputedFilter is an aggregator that applies attribute filter when
// Aggregating for precomputed Aggregations. The precomputed Aggregations need
// to operate normally when no attribute filtering is done (for sums this means
// setting the value), but when attribute filtering is done it needs to be
// added to any set value.
// Aggregating for pre-computed Aggregations. The pre-computed Aggregations
// need to operate normally when no attribute filtering is done (for sums this
// means setting the value), but when attribute filtering is done it needs to
// be added to any set value.
type precomputedFilter[N int64 | float64] struct {
filter attribute.Filter
aggregator filterAgg[N]
aggregator precomputeAggregator[N]

sync.Mutex
seen map[attribute.Set]attribute.Set
}

func newPrecomputedFilter[N int64 | float64](agg filterAgg[N], fn attribute.Filter) *precomputedFilter[N] {
// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg
// with the attribute filter fn.
//
// This should not be used to wrap a non-pre-computed Aggregator. Use a
// precomputedFilter instead.
func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] {
return &precomputedFilter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}

// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
Expand All @@ -110,10 +123,12 @@ func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
// No filtering done.
f.aggregator.Aggregate(measurement, fAttr)
} else {
f.aggregator.filtered(measurement, fAttr)
f.aggregator.aggregateFiltered(measurement, fAttr)
}
}

// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}
2 changes: 1 addition & 1 deletion sdk/metric/internal/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) {
}

// nolint: unused // Used to agg filtered.
func (a *testFilterAgg[N]) filtered(val N, attr attribute.Set) {
func (a *testFilterAgg[N]) aggregateFiltered(val N, attr attribute.Set) {
v := a.values[attr]
v.filtered += val
a.values[attr] = v
Expand Down
72 changes: 55 additions & 17 deletions sdk/metric/internal/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,17 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
return out
}

// precomputedValue is the recorded measurement value for a set of attributes.
type precomputedValue[N int64 | float64] struct {
// measured is the value directly measured.
// measured is the last value measured for a set of attributes that were
// not filtered.
measured N
// filtered is the sum of values from spatially aggregations.
// filtered is the sum of values from measurements that had their
// attributes filtered.
filtered N
}

// valueMap is the storage for precomputed sums.
// precomputedMap is the storage for precomputed sums.
type precomputedMap[N int64 | float64] struct {
sync.Mutex
values map[attribute.Set]precomputedValue[N]
Expand All @@ -177,7 +180,14 @@ func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] {
}
}

// Aggregate records value as a cumulative sum for attr.
// Aggregate records value with the unfiltered attributes attr.
//
// If a previous measurement was made for the same attribute set:
//
// - If that measurement's attributes were not filtered, this value overwrite
// that value.
// - If that measurement's attributes were filtered, this value will be
// recorded along side that value.
func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
v := s.values[attr]
Expand All @@ -186,8 +196,18 @@ func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) {
s.Unlock()
}

// filtered records value with spatially re-aggregated attrs.
func (s *precomputedMap[N]) filtered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered.
// aggregateFiltered records value with the filtered attributes attr.
//
// If a previous measurement was made for the same attribute set:
//
// - If that measurement's attributes were not filtered, this value will be
// recorded along side that value.
// - If that measurement's attributes were filtered, this value will be
// added to it.
//
// This method should not be used if attr have not been reduced by an attribute
// filter.
func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered.
s.Lock()
v := s.values[attr]
v.filtered += value
Expand All @@ -196,15 +216,14 @@ func (s *precomputedMap[N]) filtered(value N, attr attribute.Set) { // nolint: u
}

// NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of
// measurements as their pre-computed arithmetic sum. Each sum is scoped by
// attributes and the aggregation cycle the measurements were made in.
// pre-computed sums. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in.
//
// The monotonic value is used to communicate the produced Aggregation is
// monotonic or not. The returned Aggregator does not make any guarantees this
// value is accurate. It is up to the caller to ensure it.
//
// The output Aggregation will report recorded values as delta temporality. It
// is up to the caller to ensure this is accurate.
// The output Aggregation will report recorded values as delta temporality.
func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedDeltaSum[N]{
precomputedMap: newPrecomputedMap[N](),
Expand All @@ -214,8 +233,8 @@ func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
}
}

// precomputedDeltaSum summarizes a set of measurements recorded over all
// aggregation cycles as the delta arithmetic sum.
// precomputedDeltaSum summarizes a set of pre-computed sums recorded over all
// aggregation cycles as the delta of these sums.
type precomputedDeltaSum[N int64 | float64] struct {
*precomputedMap[N]

Expand All @@ -225,6 +244,16 @@ type precomputedDeltaSum[N int64 | float64] struct {
start time.Time
}

// Aggregation returns the recorded pre-computed sums as an Aggregation. The
// sum values are expressed as the delta between what was measured this
// collection cycle and the previous.
//
// All pre-computed sums that were recorded for attributes sets reduced by an
// attribute filter (filtered-sums) are summed together and added to any
// pre-computed sum value recorded directly for the resulting attribute set
// (unfiltered-sum). The filtered-sums are reset to zero for the next
// collection cycle, and the unfiltered-sum is kept for the next collection
// cycle.
func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -264,15 +293,15 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
}

// NewPrecomputedCumulativeSum returns an Aggregator that summarizes a set of
// measurements as their pre-computed arithmetic sum. Each sum is scoped by
// attributes and the aggregation cycle the measurements were made in.
// pre-computed sums. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in.
//
// The monotonic value is used to communicate the produced Aggregation is
// monotonic or not. The returned Aggregator does not make any guarantees this
// value is accurate. It is up to the caller to ensure it.
//
// The output Aggregation will report recorded values as cumulative
// temporality. It is up to the caller to ensure this is accurate.
// temporality.
func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedCumulativeSum[N]{
precomputedMap: newPrecomputedMap[N](),
Expand All @@ -281,15 +310,24 @@ func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N
}
}

// precomputedCumulativeSum summarizes a set of measurements recorded over all
// aggregation cycles directly as the cumulative arithmetic sum.
// precomputedCumulativeSum directly records and reports a set of pre-computed sums.
type precomputedCumulativeSum[N int64 | float64] struct {
*precomputedMap[N]

monotonic bool
start time.Time
}

// Aggregation returns the recorded pre-computed sums as an Aggregation. The
// sum values are expressed directly as they are assumed to be recorded as the
// cumulative sum of a some measured phenomena.
//
// All pre-computed sums that were recorded for attributes sets reduced by an
// attribute filter (filtered-sums) are summed together and added to any
// pre-computed sum value recorded directly for the resulting attribute set
// (unfiltered-sum). The filtered-sums are reset to zero for the next
// collection cycle, and the unfiltered-sum is kept for the next collection
// cycle.
func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
s.Lock()
defer s.Unlock()
Expand Down
24 changes: 12 additions & 12 deletions sdk/metric/internal/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestDeltaSumReset(t *testing.T) {
func TestPreComputedDeltaSum(t *testing.T) {
var mono bool
agg := NewPrecomputedDeltaSum[int64](mono)
require.Implements(t, (*filterAgg[int64])(nil), agg)
require.Implements(t, (*precomputeAggregator[int64])(nil), agg)

attrs := attribute.NewSet(attribute.String("key", "val"))
agg.Aggregate(1, attrs)
Expand All @@ -185,7 +185,7 @@ func TestPreComputedDeltaSum(t *testing.T) {
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

agg.(filterAgg[int64]).filtered(1, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
// measured(+): 1, previous(-): 1, filtered(+): 1
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
Expand All @@ -205,8 +205,8 @@ func TestPreComputedDeltaSum(t *testing.T) {
agg.Aggregate(2, attrs)
agg.Aggregate(5, attrs)
// Filtered should add.
agg.(filterAgg[int64]).filtered(3, attrs)
agg.(filterAgg[int64]).filtered(10, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
// measured(+): 5, previous(-): 1, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)}
Expand All @@ -221,9 +221,9 @@ func TestPreComputedDeltaSum(t *testing.T) {

// Order should not affect measure.
// Filtered should add.
agg.(filterAgg[int64]).filtered(3, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.Aggregate(7, attrs)
agg.(filterAgg[int64]).filtered(10, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
// measured(+): 7, previous(-): 5, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 15)}
Expand All @@ -238,7 +238,7 @@ func TestPreComputedDeltaSum(t *testing.T) {
func TestPreComputedCumulativeSum(t *testing.T) {
var mono bool
agg := NewPrecomputedCumulativeSum[int64](mono)
require.Implements(t, (*filterAgg[int64])(nil), agg)
require.Implements(t, (*precomputeAggregator[int64])(nil), agg)

attrs := attribute.NewSet(attribute.String("key", "val"))
agg.Aggregate(1, attrs)
Expand All @@ -255,7 +255,7 @@ func TestPreComputedCumulativeSum(t *testing.T) {
got = agg.Aggregation()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)

agg.(filterAgg[int64]).filtered(1, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
Expand All @@ -268,8 +268,8 @@ func TestPreComputedCumulativeSum(t *testing.T) {
// Override set value.
agg.Aggregate(5, attrs)
// Filtered should add.
agg.(filterAgg[int64]).filtered(3, attrs)
agg.(filterAgg[int64]).filtered(10, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
Expand All @@ -281,9 +281,9 @@ func TestPreComputedCumulativeSum(t *testing.T) {

// Order should not affect measure.
// Filtered should add.
agg.(filterAgg[int64]).filtered(3, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.Aggregate(7, attrs)
agg.(filterAgg[int64]).filtered(10, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
Expand Down

0 comments on commit 38cd7f4

Please sign in to comment.