Skip to content

Commit

Permalink
Support a telegraf.Metric.Split function
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Dec 5, 2016
1 parent 5a3f2e6 commit a3a40b8
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 10 deletions.
4 changes: 4 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Metric interface {
Serialize() []byte
String() string // convenience function for string(Serialize())
Copy() Metric
// Split will attempt to return multiple metrics with the same timestamp
// whose string representations are no longer than maxSize.
// Metrics with a single field may exceed the requested size.
Split(maxSize int) []Metric

// Tag functions
HasTag(key string) bool
Expand Down
75 changes: 65 additions & 10 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,57 @@ func (m *metric) Serialize() []byte {
return tmp
}

func (m *metric) Split(maxSize int) []telegraf.Metric {
if m.Len() < maxSize {
return []telegraf.Metric{m}
}
var out []telegraf.Metric

// constant number of bytes for each metric (in addition to field bytes)
constant := len(m.name) + len(m.tags) + len(m.t) + 3
// currently selected fields
fields := make([]byte, 0, maxSize)

i := 0
for {
if i >= len(m.fields) {
// hit the end of the field byte slice
if len(fields) > 0 {
out = append(out, copyWith(m.name, m.tags, fields, m.t))
}
break
}

// find the end of the next field
j := indexUnescapedByte(m.fields[i:], ',')
if j == -1 {
j = len(m.fields)
} else {
j += i
}

// if true, then we need to create a metric _not_ including the currently
// selected field
if len(m.fields[i:j])+len(fields)+constant > maxSize {
// if false, then we'll create a metric including the currently
// selected field anyways. This means that the given maxSize is too
// small for a single field to fit.
if len(fields) > 0 {
out = append(out, copyWith(m.name, m.tags, fields, m.t))
}

fields = make([]byte, 0, maxSize)
}
if len(fields) > 0 {
fields = append(fields, ',')
}
fields = append(fields, m.fields[i:j]...)

i = j + 1
}
return out
}

func (m *metric) Fields() map[string]interface{} {
fieldMap := map[string]interface{}{}
i := 0
Expand Down Expand Up @@ -380,17 +431,21 @@ func (m *metric) RemoveField(key string) error {
}

func (m *metric) Copy() telegraf.Metric {
mOut := metric{
name: make([]byte, len(m.name)),
tags: make([]byte, len(m.tags)),
fields: make([]byte, len(m.fields)),
t: make([]byte, len(m.t)),
return copyWith(m.name, m.tags, m.fields, m.t)
}

func copyWith(name, tags, fields, t []byte) telegraf.Metric {
out := metric{
name: make([]byte, len(name)),
tags: make([]byte, len(tags)),
fields: make([]byte, len(fields)),
t: make([]byte, len(t)),
}
copy(mOut.name, m.name)
copy(mOut.tags, m.tags)
copy(mOut.fields, m.fields)
copy(mOut.t, m.t)
return &mOut
copy(out.name, name)
copy(out.tags, tags)
copy(out.fields, fields)
copy(out.t, t)
return &out
}

func (m *metric) HashID() uint64 {
Expand Down
15 changes: 15 additions & 0 deletions metric/metric_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ func BenchmarkAddTag(b *testing.B) {
s = string(mt.String())
}

func BenchmarkSplit(b *testing.B) {
var mt telegraf.Metric
mt = &metric{
name: []byte("cpu"),
tags: []byte(",host=localhost"),
fields: []byte("a=101"),
t: []byte("1480614053000000000"),
}
var metrics []telegraf.Metric
for n := 0; n < b.N; n++ {
metrics = mt.Split(20)
}
s = string(metrics[0].String())
}

func BenchmarkTags(b *testing.B) {
for n := 0; n < b.N; n++ {
var mt, _ = New("test_metric",
Expand Down
144 changes: 144 additions & 0 deletions metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metric
import (
"fmt"
"math"
"regexp"
"testing"
"time"

Expand Down Expand Up @@ -434,6 +435,149 @@ func TestNewCounterMetric(t *testing.T) {
assert.Equal(t, now.UnixNano(), m.UnixNano())
}

// test splitting metric into various max lengths
func TestSplitMetric(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
"string": "test",
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

split80 := m.Split(80)
assert.Len(t, split80, 2)

split70 := m.Split(70)
assert.Len(t, split70, 3)

split60 := m.Split(60)
assert.Len(t, split60, 4)
}

// test splitting metric into various max lengths
// use a simple regex check to verify that the split metrics are valid
func TestSplitMetric_RegexVerify(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"foo": float64(98934259085),
"bar": float64(19385292),
"number": float64(19385292),
"another": float64(19385292),
"n": float64(19385292),
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

// verification regex
re := regexp.MustCompile(`cpu,host=localhost \w+=\d+(,\w+=\d+)* 1480940990034083306`)

split90 := m.Split(90)
assert.Len(t, split90, 2)
for _, splitM := range split90 {
assert.True(t, re.Match(splitM.Serialize()), splitM.String())
}

split70 := m.Split(70)
assert.Len(t, split70, 3)
for _, splitM := range split70 {
assert.True(t, re.Match(splitM.Serialize()), splitM.String())
}

split20 := m.Split(20)
assert.Len(t, split20, 5)
for _, splitM := range split20 {
assert.True(t, re.Match(splitM.Serialize()), splitM.String())
}
}

// test splitting metric even when given length is shorter than
// shortest possible length
// Split should split metric as short as possible, ie, 1 field per metric
func TestSplitMetric_TooShort(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
"string": "test",
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

split := m.Split(10)
assert.Len(t, split, 5)
strings := make([]string, 5)
for i, splitM := range split {
strings[i] = splitM.String()
}

assert.Contains(t, strings, "cpu,host=localhost float=100001 1480940990034083306\n")
assert.Contains(t, strings, "cpu,host=localhost int=100001i 1480940990034083306\n")
assert.Contains(t, strings, "cpu,host=localhost bool=true 1480940990034083306\n")
assert.Contains(t, strings, "cpu,host=localhost false=false 1480940990034083306\n")
assert.Contains(t, strings, "cpu,host=localhost string=\"test\" 1480940990034083306\n")
}

func TestSplitMetric_NoOp(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
"string": "test",
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

split := m.Split(1000)
assert.Len(t, split, 1)
assert.Equal(t, m, split[0])
}

func TestSplitMetric_OneField(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", m.String())

split := m.Split(1000)
assert.Len(t, split, 1)
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())

split = m.Split(1)
assert.Len(t, split, 1)
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())

split = m.Split(40)
assert.Len(t, split, 1)
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
}

func TestNewMetricAggregate(t *testing.T) {
now := time.Now()

Expand Down

0 comments on commit a3a40b8

Please sign in to comment.