diff --git a/metric/metricdata/exemplar.go b/metric/metricdata/exemplar.go index cdbeef058..4e474e39b 100644 --- a/metric/metricdata/exemplar.go +++ b/metric/metricdata/exemplar.go @@ -16,6 +16,13 @@ package metricdata import ( "time" + + "go.opencensus.io/trace" +) + +// Exemplars keys. +const ( + AttachmentKeySpanContext = "SpanContext" ) // Exemplar is an example data point associated with each bucket of a @@ -24,10 +31,23 @@ import ( // Their purpose is to provide an example of the kind of thing // (request, RPC, trace span, etc.) that resulted in that measurement. type Exemplar struct { - Value float64 // the value that was recorded - Timestamp time.Time // the time the value was recorded - Attachments Attachments // attachments (if any) + Value float64 // the value that was recorded + Timestamp time.Time // the time the value was recorded + Attachments map[string]interface{} // attachments (if any) +} + +// Attachment is a key-value pair associated with a recorded example data point. +type Attachment struct { + Key string + Value interface{} +} + +// SpanContextAttachment returns a span context valued attachment. +func SpanContextAttachment(key string, value trace.SpanContext) Attachment { + return Attachment{Key: key, Value: value} } -// Attachments is a map of extra values associated with a recorded data point. -type Attachments map[string]interface{} +// StringAttachment returns a string attachment. +func StringAttachment(key string, value string) Attachment { + return Attachment{Key: key, Value: value} +} diff --git a/stats/internal/record.go b/stats/internal/record.go index 36935e629..7f9b265ca 100644 --- a/stats/internal/record.go +++ b/stats/internal/record.go @@ -15,11 +15,12 @@ package internal import ( + "go.opencensus.io/metric/metricdata" "go.opencensus.io/tag" ) // DefaultRecorder will be called for each Record call. -var DefaultRecorder func(tags *tag.Map, measurement interface{}, attachments map[string]interface{}) +var DefaultRecorder func(tags *tag.Map, measurement interface{}, attachments []metricdata.Attachment) // SubscriptionReporter reports when a view subscribed with a measure. var SubscriptionReporter func(measure string) diff --git a/stats/record.go b/stats/record.go index d2af0a60d..927579994 100644 --- a/stats/record.go +++ b/stats/record.go @@ -18,6 +18,7 @@ package stats import ( "context" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/stats/internal" "go.opencensus.io/tag" ) @@ -33,6 +34,12 @@ func init() { // Record records one or multiple measurements with the same context at once. // If there are any tags in the context, measurements will be tagged with them. func Record(ctx context.Context, ms ...Measurement) { + RecordWithAttachments(ctx, nil, ms...) +} + +// RecordWithAttachments records measurements and the given exemplar attachments against context. +// If there are any tags in the context, measurements will be tagged with them. +func RecordWithAttachments(ctx context.Context, attachments []metricdata.Attachment, ms ...Measurement) { recorder := internal.DefaultRecorder if recorder == nil { return @@ -50,8 +57,7 @@ func Record(ctx context.Context, ms ...Measurement) { if !record { return } - // TODO(songy23): fix attachments. - recorder(tag.FromContext(ctx), ms, map[string]interface{}{}) + recorder(tag.FromContext(ctx), ms, attachments) } // RecordWithTags records one or multiple measurements at once. @@ -60,10 +66,19 @@ func Record(ctx context.Context, ms ...Measurement) { // RecordWithTags is useful if you want to record with tag mutations but don't want // to propagate the mutations in the context. func RecordWithTags(ctx context.Context, mutators []tag.Mutator, ms ...Measurement) error { + return RecordWithTagsAndAttachments(ctx, mutators, nil, ms...) +} + +// RecordWithTagsAndAttachments records measurements and the given exemplar attachments at once. +// +// Measurements will be tagged with the tags in the context mutated by the mutators. +// RecordWithTags is useful if you want to record with tag mutations but don't want +// to propagate the mutations in the context. +func RecordWithTagsAndAttachments(ctx context.Context, mutators []tag.Mutator, attachments []metricdata.Attachment, ms ...Measurement) error { ctx, err := tag.New(ctx, mutators...) if err != nil { return err } - Record(ctx, ms...) + RecordWithAttachments(ctx, attachments, ms...) return nil } diff --git a/stats/record_test.go b/stats/record_test.go new file mode 100644 index 000000000..67a298c1f --- /dev/null +++ b/stats/record_test.go @@ -0,0 +1,95 @@ +// Copyright 2019, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stats_test + +import ( + "context" + "log" + "reflect" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "go.opencensus.io/trace" +) + +var ( + tid = trace.TraceID{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 4, 8, 16, 32, 64, 128} + sid = trace.SpanID{1, 2, 4, 8, 16, 32, 64, 128} + spanCtx = trace.SpanContext{ + TraceID: tid, + SpanID: sid, + TraceOptions: 1, + } +) + +func TestRecordWithAttachments(t *testing.T) { + k1, _ := tag.NewKey("k1") + k2, _ := tag.NewKey("k2") + distribution := view.Distribution(5, 10) + m := stats.Int64("TestRecordWithAttachments/m1", "", stats.UnitDimensionless) + v := &view.View{ + Name: "test_view", + TagKeys: []tag.Key{k1, k2}, + Measure: m, + Aggregation: distribution, + } + view.SetReportingPeriod(100 * time.Millisecond) + if err := view.Register(v); err != nil { + log.Fatalf("Failed to register views: %v", err) + } + + attachments := []metricdata.Attachment{metricdata.SpanContextAttachment(metricdata.AttachmentKeySpanContext, spanCtx)} + stats.RecordWithAttachments(context.Background(), attachments, m.M(12)) + rows, err := view.RetrieveData("test_view") + if err != nil { + t.Errorf("Failed to retrieve data %v", err) + } + if len(rows) == 0 { + t.Errorf("No data was recorded.") + } + data := rows[0].Data + dis, ok := data.(*view.DistributionData) + if !ok { + t.Errorf("want DistributionData, got %+v", data) + } + wantBuckets := []int64{0, 0, 1} + if !reflect.DeepEqual(dis.CountPerBucket, wantBuckets) { + t.Errorf("want buckets %v, got %v", wantBuckets, dis.CountPerBucket) + } + for i, e := range dis.ExemplarsPerBucket { + // Exemplar slice should be [nil, nil, exemplar] + if i != 2 && e != nil { + t.Errorf("want nil exemplar, got %v", e) + } + if i == 2 { + wantExemplar := &metricdata.Exemplar{Value: 12, Attachments: map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}} + if diff := cmpExemplar(e, wantExemplar); diff != "" { + t.Fatalf("Unexpected Exemplar -got +want: %s", diff) + } + } + } +} + +// Compare exemplars while ignoring exemplar timestamp, since timestamp is non-deterministic. +func cmpExemplar(got, want *metricdata.Exemplar) string { + return cmp.Diff(got, want, cmpopts.IgnoreFields(metricdata.Exemplar{}, "Timestamp"), cmpopts.IgnoreUnexported(metricdata.Exemplar{})) +} diff --git a/stats/view/aggregation_data.go b/stats/view/aggregation_data.go index d500e67f7..5206ae5fa 100644 --- a/stats/view/aggregation_data.go +++ b/stats/view/aggregation_data.go @@ -27,7 +27,7 @@ import ( // Mosts users won't directly access aggregration data. type AggregationData interface { isAggregationData() bool - addSample(v float64, attachments map[string]interface{}, t time.Time) + addSample(v float64, attachments []metricdata.Attachment, t time.Time) clone() AggregationData equal(other AggregationData) bool toPoint(t metricdata.Type, time time.Time) metricdata.Point @@ -45,7 +45,7 @@ type CountData struct { func (a *CountData) isAggregationData() bool { return true } -func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) { +func (a *CountData) addSample(_ float64, _ []metricdata.Attachment, _ time.Time) { a.Value = a.Value + 1 } @@ -81,7 +81,7 @@ type SumData struct { func (a *SumData) isAggregationData() bool { return true } -func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) { +func (a *SumData) addSample(v float64, _ []metricdata.Attachment, _ time.Time) { a.Value += v } @@ -152,7 +152,7 @@ func (a *DistributionData) variance() float64 { func (a *DistributionData) isAggregationData() bool { return true } // TODO(songy23): support exemplar attachments. -func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) { +func (a *DistributionData) addSample(v float64, attachments []metricdata.Attachment, t time.Time) { if v < a.Min { a.Min = v } @@ -172,7 +172,7 @@ func (a *DistributionData) addSample(v float64, attachments map[string]interface a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean) } -func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) { +func (a *DistributionData) addToBucket(v float64, attachments []metricdata.Attachment, t time.Time) { var count *int64 var i int var b float64 @@ -192,14 +192,18 @@ func (a *DistributionData) addToBucket(v float64, attachments map[string]interfa } } -func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar { +func getExemplar(v float64, attachments []metricdata.Attachment, t time.Time) *metricdata.Exemplar { if len(attachments) == 0 { return nil } + attachmentMap := make(map[string]interface{}) + for _, attachment := range attachments { + attachmentMap[attachment.Key] = attachment.Value + } return &metricdata.Exemplar{ Value: v, Timestamp: t, - Attachments: attachments, + Attachments: attachmentMap, } } @@ -265,7 +269,7 @@ func (l *LastValueData) isAggregationData() bool { return true } -func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) { +func (l *LastValueData) addSample(v float64, _ []metricdata.Attachment, _ time.Time) { l.Value = v } diff --git a/stats/view/aggregation_data_test.go b/stats/view/aggregation_data_test.go index a7e056752..c9fbd2d1f 100644 --- a/stats/view/aggregation_data_test.go +++ b/stats/view/aggregation_data_test.go @@ -67,11 +67,11 @@ func TestDataClone(t *testing.T) { func TestDistributionData_addSample(t *testing.T) { dd := newDistributionData([]float64{1, 2}) - attachments1 := map[string]interface{}{"key1": "value1"} + attachments1 := []metricdata.Attachment{metricdata.StringAttachment("k", "v")} t1 := time.Now() dd.addSample(0.5, attachments1, t1) - e1 := &metricdata.Exemplar{Value: 0.5, Timestamp: t1, Attachments: attachments1} + e1 := &metricdata.Exemplar{Value: 0.5, Timestamp: t1, Attachments: map[string]interface{}{"k": "v"}} want := &DistributionData{ Count: 1, CountPerBucket: []int64{1, 0, 0}, @@ -85,12 +85,12 @@ func TestDistributionData_addSample(t *testing.T) { t.Fatalf("Unexpected DistributionData -got +want: %s", diff) } - attachments2 := map[string]interface{}{"key2": "value2"} + attachments2 := []metricdata.Attachment{metricdata.StringAttachment("k2", "v2")} t2 := t1.Add(time.Microsecond) dd.addSample(0.7, attachments2, t2) // Previous exemplar should be overwritten. - e2 := &metricdata.Exemplar{Value: 0.7, Timestamp: t2, Attachments: attachments2} + e2 := &metricdata.Exemplar{Value: 0.7, Timestamp: t2, Attachments: map[string]interface{}{"k2": "v2"}} want = &DistributionData{ Count: 2, CountPerBucket: []int64{2, 0, 0}, @@ -104,12 +104,12 @@ func TestDistributionData_addSample(t *testing.T) { t.Fatalf("Unexpected DistributionData -got +want: %s", diff) } - attachments3 := map[string]interface{}{"key3": "value3"} + attachments3 := []metricdata.Attachment{metricdata.StringAttachment("k3", "v3")} t3 := t2.Add(time.Microsecond) dd.addSample(1.2, attachments3, t3) // e3 is at another bucket. e2 should still be there. - e3 := &metricdata.Exemplar{Value: 1.2, Timestamp: t3, Attachments: attachments3} + e3 := &metricdata.Exemplar{Value: 1.2, Timestamp: t3, Attachments: map[string]interface{}{"k3": "v3"}} want = &DistributionData{ Count: 3, CountPerBucket: []int64{2, 1, 0}, diff --git a/stats/view/collector.go b/stats/view/collector.go index 8a6a2c0fd..fb0db5a47 100644 --- a/stats/view/collector.go +++ b/stats/view/collector.go @@ -20,6 +20,7 @@ import ( "time" "go.opencensus.io/internal/tagencoding" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/tag" ) @@ -32,7 +33,7 @@ type collector struct { a *Aggregation } -func (c *collector) addSample(s string, v float64, attachments map[string]interface{}, t time.Time) { +func (c *collector) addSample(s string, v float64, attachments []metricdata.Attachment, t time.Time) { aggregator, ok := c.signatures[s] if !ok { aggregator = c.a.newData() diff --git a/stats/view/view.go b/stats/view/view.go index 37f88e1d9..400b98d74 100644 --- a/stats/view/view.go +++ b/stats/view/view.go @@ -153,7 +153,7 @@ func (v *viewInternal) collectedRows() []*Row { return v.collector.collectedRows(v.view.TagKeys) } -func (v *viewInternal) addSample(m *tag.Map, val float64, attachments map[string]interface{}, t time.Time) { +func (v *viewInternal) addSample(m *tag.Map, val float64, attachments []metricdata.Attachment, t time.Time) { if !v.isSubscribed() { return } diff --git a/stats/view/view_to_metric_test.go b/stats/view/view_to_metric_test.go index d9a5a3de0..03af4b214 100644 --- a/stats/view/view_to_metric_test.go +++ b/stats/view/view_to_metric_test.go @@ -61,9 +61,6 @@ var ( aggL *Aggregation buckOpt *metricdata.BucketOptions - // exemplar objects. - attachments metricdata.Attachments - // views and descriptors viewTypeFloat64Distribution *View viewTypeInt64Distribution *View diff --git a/stats/view/worker.go b/stats/view/worker.go index 37279b39e..58d802c14 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -106,7 +106,7 @@ func RetrieveData(viewName string) ([]*Row, error) { return resp.rows, resp.err } -func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) { +func record(tags *tag.Map, ms interface{}, attachments []metricdata.Attachment) { req := &recordReq{ tm: tags, ms: ms.([]stats.Measurement), diff --git a/stats/view/worker_commands.go b/stats/view/worker_commands.go index ba6203a50..51aa857c1 100644 --- a/stats/view/worker_commands.go +++ b/stats/view/worker_commands.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "go.opencensus.io/metric/metricdata" "go.opencensus.io/stats" "go.opencensus.io/stats/internal" "go.opencensus.io/tag" @@ -148,7 +149,7 @@ func (cmd *retrieveDataReq) handleCommand(w *worker) { type recordReq struct { tm *tag.Map ms []stats.Measurement - attachments map[string]interface{} + attachments []metricdata.Attachment t time.Time }