Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support a telegraf.Metric.Split function #2126

Merged
merged 1 commit into from
Dec 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [#1807](https://github.com/influxdata/telegraf/pull/1807): Option to use device name rather than path for reporting disk stats.
- [#1348](https://github.com/influxdata/telegraf/issues/1348): Telegraf "internal" plugin for collecting stats on itself.
- [#2127](https://github.com/influxdata/telegraf/pull/2127): Update Go version to 1.7.4.
- [#2126](https://github.com/influxdata/telegraf/pull/2126): Support a metric.Split function.

### Bugfixes

Expand Down
4 changes: 4 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Metric interface {
Serialize() []byte
String() string // convenience function for string(Serialize())
Copy() Metric
// Split will attempt to return multiple metrics with the same timestamp
// whose string representations are no longer than maxSize.
// Metrics with a single field may exceed the requested size.
Split(maxSize int) []Metric

// Tag functions
HasTag(key string) bool
Expand Down
75 changes: 65 additions & 10 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,57 @@ func (m *metric) Serialize() []byte {
return tmp
}

func (m *metric) Split(maxSize int) []telegraf.Metric {
if m.Len() < maxSize {
return []telegraf.Metric{m}
}
var out []telegraf.Metric

// constant number of bytes for each metric (in addition to field bytes)
constant := len(m.name) + len(m.tags) + len(m.t) + 3
// currently selected fields
fields := make([]byte, 0, maxSize)

i := 0
for {
if i >= len(m.fields) {
// hit the end of the field byte slice
if len(fields) > 0 {
out = append(out, copyWith(m.name, m.tags, fields, m.t))
}
break
}

// find the end of the next field
j := indexUnescapedByte(m.fields[i:], ',')
if j == -1 {
j = len(m.fields)
} else {
j += i
}

// if true, then we need to create a metric _not_ including the currently
// selected field
if len(m.fields[i:j])+len(fields)+constant > maxSize {
// if false, then we'll create a metric including the currently
// selected field anyways. This means that the given maxSize is too
// small for a single field to fit.
if len(fields) > 0 {
out = append(out, copyWith(m.name, m.tags, fields, m.t))
}

fields = make([]byte, 0, maxSize)
}
if len(fields) > 0 {
fields = append(fields, ',')
}
fields = append(fields, m.fields[i:j]...)

i = j + 1
}
return out
}

func (m *metric) Fields() map[string]interface{} {
fieldMap := map[string]interface{}{}
i := 0
Expand Down Expand Up @@ -380,17 +431,21 @@ func (m *metric) RemoveField(key string) error {
}

func (m *metric) Copy() telegraf.Metric {
mOut := metric{
name: make([]byte, len(m.name)),
tags: make([]byte, len(m.tags)),
fields: make([]byte, len(m.fields)),
t: make([]byte, len(m.t)),
return copyWith(m.name, m.tags, m.fields, m.t)
}

func copyWith(name, tags, fields, t []byte) telegraf.Metric {
out := metric{
name: make([]byte, len(name)),
tags: make([]byte, len(tags)),
fields: make([]byte, len(fields)),
t: make([]byte, len(t)),
}
copy(mOut.name, m.name)
copy(mOut.tags, m.tags)
copy(mOut.fields, m.fields)
copy(mOut.t, m.t)
return &mOut
copy(out.name, name)
copy(out.tags, tags)
copy(out.fields, fields)
copy(out.t, t)
return &out
}

func (m *metric) HashID() uint64 {
Expand Down
15 changes: 15 additions & 0 deletions metric/metric_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ func BenchmarkAddTag(b *testing.B) {
s = string(mt.String())
}

func BenchmarkSplit(b *testing.B) {
var mt telegraf.Metric
mt = &metric{
name: []byte("cpu"),
tags: []byte(",host=localhost"),
fields: []byte("a=101,b=10i,c=10101,d=101010,e=42"),
t: []byte("1480614053000000000"),
}
var metrics []telegraf.Metric
for n := 0; n < b.N; n++ {
metrics = mt.Split(60)
}
s = string(metrics[0].String())
}

func BenchmarkTags(b *testing.B) {
for n := 0; n < b.N; n++ {
var mt, _ = New("test_metric",
Expand Down
144 changes: 144 additions & 0 deletions metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metric
import (
"fmt"
"math"
"regexp"
"testing"
"time"

Expand Down Expand Up @@ -434,6 +435,149 @@ func TestNewCounterMetric(t *testing.T) {
assert.Equal(t, now.UnixNano(), m.UnixNano())
}

// test splitting metric into various max lengths
func TestSplitMetric(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
"string": "test",
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

split80 := m.Split(80)
assert.Len(t, split80, 2)

split70 := m.Split(70)
assert.Len(t, split70, 3)

split60 := m.Split(60)
assert.Len(t, split60, 4)
}

// test splitting metric into various max lengths
// use a simple regex check to verify that the split metrics are valid
func TestSplitMetric_RegexVerify(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"foo": float64(98934259085),
"bar": float64(19385292),
"number": float64(19385292),
"another": float64(19385292),
"n": float64(19385292),
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

// verification regex
re := regexp.MustCompile(`cpu,host=localhost \w+=\d+(,\w+=\d+)* 1480940990034083306`)

split90 := m.Split(90)
assert.Len(t, split90, 2)
for _, splitM := range split90 {
assert.True(t, re.Match(splitM.Serialize()), splitM.String())
}

split70 := m.Split(70)
assert.Len(t, split70, 3)
for _, splitM := range split70 {
assert.True(t, re.Match(splitM.Serialize()), splitM.String())
}

split20 := m.Split(20)
assert.Len(t, split20, 5)
for _, splitM := range split20 {
assert.True(t, re.Match(splitM.Serialize()), splitM.String())
}
}

// test splitting metric even when given length is shorter than
// shortest possible length
// Split should split metric as short as possible, ie, 1 field per metric
func TestSplitMetric_TooShort(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
"string": "test",
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

split := m.Split(10)
assert.Len(t, split, 5)
strings := make([]string, 5)
for i, splitM := range split {
strings[i] = splitM.String()
}

assert.Contains(t, strings, "cpu,host=localhost float=100001 1480940990034083306\n")
assert.Contains(t, strings, "cpu,host=localhost int=100001i 1480940990034083306\n")
assert.Contains(t, strings, "cpu,host=localhost bool=true 1480940990034083306\n")
assert.Contains(t, strings, "cpu,host=localhost false=false 1480940990034083306\n")
assert.Contains(t, strings, "cpu,host=localhost string=\"test\" 1480940990034083306\n")
}

func TestSplitMetric_NoOp(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
"int": int64(100001),
"bool": true,
"false": false,
"string": "test",
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

split := m.Split(1000)
assert.Len(t, split, 1)
assert.Equal(t, m, split[0])
}

func TestSplitMetric_OneField(t *testing.T) {
now := time.Unix(0, 1480940990034083306)
tags := map[string]string{
"host": "localhost",
}
fields := map[string]interface{}{
"float": float64(100001),
}
m, err := New("cpu", tags, fields, now)
assert.NoError(t, err)

assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", m.String())

split := m.Split(1000)
assert.Len(t, split, 1)
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())

split = m.Split(1)
assert.Len(t, split, 1)
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())

split = m.Split(40)
assert.Len(t, split, 1)
assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
}

func TestNewMetricAggregate(t *testing.T) {
now := time.Now()

Expand Down