Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

[DO NOT MERGE] Exemplar: Refactor Attachment to be similar to Attribute. #1085

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions metric/metricdata/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ package metricdata

import (
"time"

"go.opencensus.io/trace"
)

// Exemplars keys.
const (
AttachmentKeySpanContext = "SpanContext"
)

// Exemplar is an example data point associated with each bucket of a
Expand All @@ -24,10 +31,23 @@ import (
// Their purpose is to provide an example of the kind of thing
// (request, RPC, trace span, etc.) that resulted in that measurement.
type Exemplar struct {
Value float64 // the value that was recorded
Timestamp time.Time // the time the value was recorded
Attachments Attachments // attachments (if any)
Value float64 // the value that was recorded
Timestamp time.Time // the time the value was recorded
Attachments map[string]interface{} // attachments (if any)
}

// Attachment is a key-value pair associated with a recorded example data point.
type Attachment struct {
Key string
Copy link
Contributor Author

@songy23 songy23 Mar 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key and Value have to be exposed because other packages (stats, exporters) need to access them.

Value interface{}
}

// SpanContextAttachment returns a span context valued attachment.
func SpanContextAttachment(key string, value trace.SpanContext) Attachment {
return Attachment{Key: key, Value: value}
}

// Attachments is a map of extra values associated with a recorded data point.
type Attachments map[string]interface{}
// StringAttachment returns a string attachment.
func StringAttachment(key string, value string) Attachment {
return Attachment{Key: key, Value: value}
}
3 changes: 2 additions & 1 deletion stats/internal/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package internal

import (
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/tag"
)

// DefaultRecorder will be called for each Record call.
var DefaultRecorder func(tags *tag.Map, measurement interface{}, attachments map[string]interface{})
var DefaultRecorder func(tags *tag.Map, measurement interface{}, attachments []metricdata.Attachment)

// SubscriptionReporter reports when a view subscribed with a measure.
var SubscriptionReporter func(measure string)
21 changes: 18 additions & 3 deletions stats/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package stats
import (
"context"

"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats/internal"
"go.opencensus.io/tag"
)
Expand All @@ -33,6 +34,12 @@ func init() {
// Record records one or multiple measurements with the same context at once.
// If there are any tags in the context, measurements will be tagged with them.
func Record(ctx context.Context, ms ...Measurement) {
RecordWithAttachments(ctx, nil, ms...)
}

// RecordWithAttachments records measurements and the given exemplar attachments against context.
// If there are any tags in the context, measurements will be tagged with them.
func RecordWithAttachments(ctx context.Context, attachments []metricdata.Attachment, ms ...Measurement) {
recorder := internal.DefaultRecorder
if recorder == nil {
return
Expand All @@ -50,8 +57,7 @@ func Record(ctx context.Context, ms ...Measurement) {
if !record {
return
}
// TODO(songy23): fix attachments.
recorder(tag.FromContext(ctx), ms, map[string]interface{}{})
recorder(tag.FromContext(ctx), ms, attachments)
}

// RecordWithTags records one or multiple measurements at once.
Expand All @@ -60,10 +66,19 @@ func Record(ctx context.Context, ms ...Measurement) {
// RecordWithTags is useful if you want to record with tag mutations but don't want
// to propagate the mutations in the context.
func RecordWithTags(ctx context.Context, mutators []tag.Mutator, ms ...Measurement) error {
return RecordWithTagsAndAttachments(ctx, mutators, nil, ms...)
}

// RecordWithTagsAndAttachments records measurements and the given exemplar attachments at once.
//
// Measurements will be tagged with the tags in the context mutated by the mutators.
// RecordWithTags is useful if you want to record with tag mutations but don't want
// to propagate the mutations in the context.
func RecordWithTagsAndAttachments(ctx context.Context, mutators []tag.Mutator, attachments []metricdata.Attachment, ms ...Measurement) error {
ctx, err := tag.New(ctx, mutators...)
if err != nil {
return err
}
Record(ctx, ms...)
RecordWithAttachments(ctx, attachments, ms...)
return nil
}
95 changes: 95 additions & 0 deletions stats/record_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stats_test

import (
"context"
"log"
"reflect"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"

"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
)

var (
tid = trace.TraceID{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 4, 8, 16, 32, 64, 128}
sid = trace.SpanID{1, 2, 4, 8, 16, 32, 64, 128}
spanCtx = trace.SpanContext{
TraceID: tid,
SpanID: sid,
TraceOptions: 1,
}
)

func TestRecordWithAttachments(t *testing.T) {
k1, _ := tag.NewKey("k1")
k2, _ := tag.NewKey("k2")
distribution := view.Distribution(5, 10)
m := stats.Int64("TestRecordWithAttachments/m1", "", stats.UnitDimensionless)
v := &view.View{
Name: "test_view",
TagKeys: []tag.Key{k1, k2},
Measure: m,
Aggregation: distribution,
}
view.SetReportingPeriod(100 * time.Millisecond)
if err := view.Register(v); err != nil {
log.Fatalf("Failed to register views: %v", err)
}

attachments := []metricdata.Attachment{metricdata.SpanContextAttachment(metricdata.AttachmentKeySpanContext, spanCtx)}
stats.RecordWithAttachments(context.Background(), attachments, m.M(12))
rows, err := view.RetrieveData("test_view")
if err != nil {
t.Errorf("Failed to retrieve data %v", err)
}
if len(rows) == 0 {
t.Errorf("No data was recorded.")
}
data := rows[0].Data
dis, ok := data.(*view.DistributionData)
if !ok {
t.Errorf("want DistributionData, got %+v", data)
}
wantBuckets := []int64{0, 0, 1}
if !reflect.DeepEqual(dis.CountPerBucket, wantBuckets) {
t.Errorf("want buckets %v, got %v", wantBuckets, dis.CountPerBucket)
}
for i, e := range dis.ExemplarsPerBucket {
// Exemplar slice should be [nil, nil, exemplar]
if i != 2 && e != nil {
t.Errorf("want nil exemplar, got %v", e)
}
if i == 2 {
wantExemplar := &metricdata.Exemplar{Value: 12, Attachments: map[string]interface{}{metricdata.AttachmentKeySpanContext: spanCtx}}
if diff := cmpExemplar(e, wantExemplar); diff != "" {
t.Fatalf("Unexpected Exemplar -got +want: %s", diff)
}
}
}
}

// Compare exemplars while ignoring exemplar timestamp, since timestamp is non-deterministic.
func cmpExemplar(got, want *metricdata.Exemplar) string {
return cmp.Diff(got, want, cmpopts.IgnoreFields(metricdata.Exemplar{}, "Timestamp"), cmpopts.IgnoreUnexported(metricdata.Exemplar{}))
}
20 changes: 12 additions & 8 deletions stats/view/aggregation_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// Mosts users won't directly access aggregration data.
type AggregationData interface {
isAggregationData() bool
addSample(v float64, attachments map[string]interface{}, t time.Time)
addSample(v float64, attachments []metricdata.Attachment, t time.Time)
clone() AggregationData
equal(other AggregationData) bool
toPoint(t metricdata.Type, time time.Time) metricdata.Point
Expand All @@ -45,7 +45,7 @@ type CountData struct {

func (a *CountData) isAggregationData() bool { return true }

func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) {
func (a *CountData) addSample(_ float64, _ []metricdata.Attachment, _ time.Time) {
a.Value = a.Value + 1
}

Expand Down Expand Up @@ -81,7 +81,7 @@ type SumData struct {

func (a *SumData) isAggregationData() bool { return true }

func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
func (a *SumData) addSample(v float64, _ []metricdata.Attachment, _ time.Time) {
a.Value += v
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func (a *DistributionData) variance() float64 {
func (a *DistributionData) isAggregationData() bool { return true }

// TODO(songy23): support exemplar attachments.
func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) {
func (a *DistributionData) addSample(v float64, attachments []metricdata.Attachment, t time.Time) {
if v < a.Min {
a.Min = v
}
Expand All @@ -172,7 +172,7 @@ func (a *DistributionData) addSample(v float64, attachments map[string]interface
a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean)
}

func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) {
func (a *DistributionData) addToBucket(v float64, attachments []metricdata.Attachment, t time.Time) {
var count *int64
var i int
var b float64
Expand All @@ -192,14 +192,18 @@ func (a *DistributionData) addToBucket(v float64, attachments map[string]interfa
}
}

func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar {
func getExemplar(v float64, attachments []metricdata.Attachment, t time.Time) *metricdata.Exemplar {
if len(attachments) == 0 {
return nil
}
attachmentMap := make(map[string]interface{})
for _, attachment := range attachments {
attachmentMap[attachment.Key] = attachment.Value
}
return &metricdata.Exemplar{
Value: v,
Timestamp: t,
Attachments: attachments,
Attachments: attachmentMap,
}
}

Expand Down Expand Up @@ -265,7 +269,7 @@ func (l *LastValueData) isAggregationData() bool {
return true
}

func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) {
func (l *LastValueData) addSample(v float64, _ []metricdata.Attachment, _ time.Time) {
l.Value = v
}

Expand Down
12 changes: 6 additions & 6 deletions stats/view/aggregation_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ func TestDataClone(t *testing.T) {

func TestDistributionData_addSample(t *testing.T) {
dd := newDistributionData([]float64{1, 2})
attachments1 := map[string]interface{}{"key1": "value1"}
attachments1 := []metricdata.Attachment{metricdata.StringAttachment("k", "v")}
t1 := time.Now()
dd.addSample(0.5, attachments1, t1)

e1 := &metricdata.Exemplar{Value: 0.5, Timestamp: t1, Attachments: attachments1}
e1 := &metricdata.Exemplar{Value: 0.5, Timestamp: t1, Attachments: map[string]interface{}{"k": "v"}}
want := &DistributionData{
Count: 1,
CountPerBucket: []int64{1, 0, 0},
Expand All @@ -85,12 +85,12 @@ func TestDistributionData_addSample(t *testing.T) {
t.Fatalf("Unexpected DistributionData -got +want: %s", diff)
}

attachments2 := map[string]interface{}{"key2": "value2"}
attachments2 := []metricdata.Attachment{metricdata.StringAttachment("k2", "v2")}
t2 := t1.Add(time.Microsecond)
dd.addSample(0.7, attachments2, t2)

// Previous exemplar should be overwritten.
e2 := &metricdata.Exemplar{Value: 0.7, Timestamp: t2, Attachments: attachments2}
e2 := &metricdata.Exemplar{Value: 0.7, Timestamp: t2, Attachments: map[string]interface{}{"k2": "v2"}}
want = &DistributionData{
Count: 2,
CountPerBucket: []int64{2, 0, 0},
Expand All @@ -104,12 +104,12 @@ func TestDistributionData_addSample(t *testing.T) {
t.Fatalf("Unexpected DistributionData -got +want: %s", diff)
}

attachments3 := map[string]interface{}{"key3": "value3"}
attachments3 := []metricdata.Attachment{metricdata.StringAttachment("k3", "v3")}
t3 := t2.Add(time.Microsecond)
dd.addSample(1.2, attachments3, t3)

// e3 is at another bucket. e2 should still be there.
e3 := &metricdata.Exemplar{Value: 1.2, Timestamp: t3, Attachments: attachments3}
e3 := &metricdata.Exemplar{Value: 1.2, Timestamp: t3, Attachments: map[string]interface{}{"k3": "v3"}}
want = &DistributionData{
Count: 3,
CountPerBucket: []int64{2, 1, 0},
Expand Down
3 changes: 2 additions & 1 deletion stats/view/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"go.opencensus.io/internal/tagencoding"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/tag"
)

Expand All @@ -32,7 +33,7 @@ type collector struct {
a *Aggregation
}

func (c *collector) addSample(s string, v float64, attachments map[string]interface{}, t time.Time) {
func (c *collector) addSample(s string, v float64, attachments []metricdata.Attachment, t time.Time) {
aggregator, ok := c.signatures[s]
if !ok {
aggregator = c.a.newData()
Expand Down
2 changes: 1 addition & 1 deletion stats/view/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (v *viewInternal) collectedRows() []*Row {
return v.collector.collectedRows(v.view.TagKeys)
}

func (v *viewInternal) addSample(m *tag.Map, val float64, attachments map[string]interface{}, t time.Time) {
func (v *viewInternal) addSample(m *tag.Map, val float64, attachments []metricdata.Attachment, t time.Time) {
if !v.isSubscribed() {
return
}
Expand Down
2 changes: 1 addition & 1 deletion stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func RetrieveData(viewName string) ([]*Row, error) {
return resp.rows, resp.err
}

func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
func record(tags *tag.Map, ms interface{}, attachments []metricdata.Attachment) {
req := &recordReq{
tm: tags,
ms: ms.([]stats.Measurement),
Expand Down
3 changes: 2 additions & 1 deletion stats/view/worker_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"time"

"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats"
"go.opencensus.io/stats/internal"
"go.opencensus.io/tag"
Expand Down Expand Up @@ -148,7 +149,7 @@ func (cmd *retrieveDataReq) handleCommand(w *worker) {
type recordReq struct {
tm *tag.Map
ms []stats.Measurement
attachments map[string]interface{}
attachments []metricdata.Attachment
t time.Time
}

Expand Down