Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cardinality limiting to the metric SDK as an experimental feature #4457

Merged
merged 23 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `go.opentelemetry.io/otel/semconv/v1.24.0` package.
The package contains semantic conventions from the `v1.24.0` version of the OpenTelemetry Semantic Conventions. (#4770)
- Add `WithResourceAsConstantLabels` option to apply resource attributes for every metric emitted by the Prometheus exporter. (#4733)
- Experimental cardinality limiting is added to the metric SDK.
See [metric documentation](./sdk/metric/EXPERIMENTAL.md#cardinality-limit) for more information about this feature and how to enable it. (#4457)

### Changed

Expand Down
50 changes: 50 additions & 0 deletions sdk/metric/EXPERIMENTAL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Experimental Features

The metric SDK contains features that have not yet stabilized in the OpenTelemetry specification.
These features are added to the OpenTelemetry Go metric SDK prior to stabilization in the specification so that users can start experimenting with them and provide feedback.

These feature may change in backwards incompatible ways as feedback is applied.
See the [Compatibility and Stability](#compatibility-and-stability) section for more information.

## Features

- [Cardinality Limit](#cardinality-limit)

### Cardinality Limit

The cardinality limit is the hard limit on the number of metric streams that can be collected for a single instrument.

This experimental feature can be enabled by setting the `OTEL_GO_X_CARDINALITY_LIMIT` environment value.
The value must be an integer value.
All other values are ignored.

If the value set is less than or equal to `0`, no limit will be applied.

#### Examples

Set the cardinality limit to 2000.

```console
export OTEL_GO_X_CARDINALITY_LIMIT=2000
```

Set an infinite cardinality limit (functionally equivalent to disabling the feature).

```console
export OTEL_GO_X_CARDINALITY_LIMIT=-1
```

Disable the cardinality limit.

```console
unset OTEL_GO_X_CARDINALITY_LIMIT
```

## Compatibility and Stability

Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../VERSIONING.md).
These features may be removed or modified in successive version releases, including patch versions.

When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release.
There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version.
If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support.
18 changes: 13 additions & 5 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ type Builder[N int64 | float64] struct {
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
// attribute.
//
// If AggregationLimit is less than or equal to zero there will not be an
// aggregation limit imposed (i.e. unlimited attribute sets).
AggregationLimit int
}

func (b Builder[N]) filter(f Measure[N]) Measure[N] {
Expand All @@ -63,7 +71,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N]()
lv := newLastValue[N](b.AggregationLimit)

return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
Expand All @@ -79,7 +87,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newPrecomputedSum[N](monotonic)
s := newPrecomputedSum[N](monotonic, b.AggregationLimit)
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -90,7 +98,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
s := newSum[N](monotonic)
s := newSum[N](monotonic, b.AggregationLimit)
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
Expand All @@ -102,7 +110,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum)
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit)
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand All @@ -114,7 +122,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu
// ExponentialBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum)
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit)
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
Expand Down
4 changes: 4 additions & 0 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@ var (
keyUser = "user"
userAlice = attribute.String(keyUser, "Alice")
userBob = attribute.String(keyUser, "Bob")
userCarol = attribute.String(keyUser, "Carol")
userDave = attribute.String(keyUser, "Dave")
adminTrue = attribute.Bool("admin", true)
adminFalse = attribute.Bool("admin", false)

alice = attribute.NewSet(userAlice, adminTrue)
bob = attribute.NewSet(userBob, adminFalse)
carol = attribute.NewSet(userCarol, adminFalse)
dave = attribute.NewSet(userDave, adminFalse)

// Filtered.
attrFltr = func(kv attribute.KeyValue) bool {
Expand Down
5 changes: 4 additions & 1 deletion sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,14 @@ func (b *expoBuckets) downscale(delta int) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
maxSize: int(maxSize),
maxScale: int(maxScale),

limit: newLimiter[*expoHistogramDataPoint[N]](limit),
values: make(map[attribute.Set]*expoHistogramDataPoint[N]),

start: now(),
Expand All @@ -309,6 +310,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int

limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Set]*expoHistogramDataPoint[N]
valuesMu sync.Mutex

Expand All @@ -324,6 +326,7 @@ func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Se
e.valuesMu.Lock()
defer e.valuesMu.Unlock()

attr = e.limit.Attributes(attr, e.values)
v, ok := e.values[attr]
if !ok {
v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
Expand Down
114 changes: 108 additions & 6 deletions sdk/metric/internal/aggregate/exponential_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) {
restore := withHandler(t)
defer restore()

h := newExponentialHistogram[int64](4, 20, false, false)
h := newExponentialHistogram[int64](4, 20, false, false, 0)
for _, v := range tt.values {
h.measure(context.Background(), v, alice)
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) {
restore := withHandler(t)
defer restore()

h := newExponentialHistogram[float64](4, 20, false, false)
h := newExponentialHistogram[float64](4, 20, false, false, 0)
for _, v := range tt.values {
h.measure(context.Background(), v, alice)
}
Expand Down Expand Up @@ -747,8 +747,9 @@ func TestExponentialHistogramAggregation(t *testing.T) {

func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality,
Filter: attrFltr,
Temporality: metricdata.DeltaTemporality,
Filter: attrFltr,
AggregationLimit: 2,
}.ExponentialBucketHistogram(4, 20, false, false)
ctx := context.Background()
return test[N](in, out, []teststep[N]{
Expand Down Expand Up @@ -805,13 +806,67 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
},
},
},
{
input: []arg[N]{
{ctx, 4, alice},
{ctx, 4, alice},
{ctx, 4, alice},
{ctx, 2, alice},
{ctx, 16, alice},
{ctx, 1, alice},
// These will exceed the cardinality limit.
{ctx, 4, bob},
{ctx, 4, bob},
{ctx, 4, bob},
{ctx, 2, carol},
{ctx, 16, carol},
{ctx, 1, dave},
},
expect: output{
n: 2,
agg: metricdata.ExponentialHistogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{
{
Attributes: fltrAlice,
StartTime: staticTime,
Time: staticTime,
Count: 6,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](16),
Sum: 31,
Scale: -1,
PositiveBucket: metricdata.ExponentialBucket{
Offset: -1,
Counts: []uint64{1, 4, 1},
},
},
{
Attributes: overflowSet,
StartTime: staticTime,
Time: staticTime,
Count: 6,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](16),
Sum: 31,
Scale: -1,
PositiveBucket: metricdata.ExponentialBucket{
Offset: -1,
Counts: []uint64{1, 4, 1},
},
},
},
},
},
},
})
}

func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr,
Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr,
AggregationLimit: 2,
}.ExponentialBucketHistogram(4, 20, false, false)
ctx := context.Background()
return test[N](in, out, []teststep[N]{
Expand Down Expand Up @@ -911,6 +966,53 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
},
},
},
{
input: []arg[N]{
// These will exceed the cardinality limit.
{ctx, 4, bob},
{ctx, 4, bob},
{ctx, 4, bob},
{ctx, 2, carol},
{ctx, 16, carol},
{ctx, 1, dave},
},
expect: output{
n: 2,
agg: metricdata.ExponentialHistogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{
{
Attributes: fltrAlice,
StartTime: staticTime,
Time: staticTime,
Count: 9,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](16),
Sum: 44,
Scale: -1,
PositiveBucket: metricdata.ExponentialBucket{
Offset: -1,
Counts: []uint64{1, 6, 2},
},
},
{
Attributes: overflowSet,
StartTime: staticTime,
Time: staticTime,
Count: 6,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](16),
Sum: 31,
Scale: -1,
PositiveBucket: metricdata.ExponentialBucket{
Offset: -1,
Counts: []uint64{1, 4, 1},
},
},
},
},
},
},
})
}

Expand Down
9 changes: 6 additions & 3 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

limit limiter[*buckets[N]]
values map[attribute.Set]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand All @@ -69,6 +70,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[
return &histValues[N]{
noSum: noSum,
bounds: b,
limit: newLimiter[*buckets[N]](limit),
values: make(map[attribute.Set]*buckets[N]),
}
}
Expand All @@ -86,6 +88,7 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set)
s.valuesMu.Lock()
defer s.valuesMu.Unlock()

attr = s.limit.Attributes(attr, s.values)
b, ok := s.values[attr]
if !ok {
// N+1 buckets. For example:
Expand All @@ -108,9 +111,9 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set)

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum),
histValues: newHistValues[N](boundaries, noSum, limit),
noMinMax: noMinMax,
start: now(),
}
Expand Down
Loading