Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splunk Metrics serializer #4339

Merged
merged 13 commits into from
Sep 11, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions plugins/serializers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/plugins/serializers/splunkmetric"
)

// SerializerOutput is an interface for output plugins that are able to
Expand Down Expand Up @@ -73,6 +74,8 @@ func NewSerializer(config *Config) (Serializer, error) {
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport)
case "json":
serializer, err = NewJsonSerializer(config.TimestampUnits)
case "splunkmetric":
ronnocol marked this conversation as resolved.
Show resolved Hide resolved
serializer, err = NewSplunkmetricSerializer(config.TimestampUnits)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splunk only supports seconds. The serializer should not be using config.TimestampUnits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splunk absolutely support sub second timestamps. From the default datetime.xml:

<define name="_utcepoch" extract="utcepoch, subsecond">
        <!-- update regex before '2017' -->
    <text><![CDATA[((?<=^|[\s#,"=\(\[\|\{])(?:1[012345]|9)\d{8}|^@[\da-fA-F]{16,24})(?:\.?(\d{1,6}))?(?![\d\(])]]></text>
</define>```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't say it doesn't. It only supports seconds as the unit. It supports more than that as the precision. Unit != precision.

default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
Expand All @@ -83,6 +86,10 @@ func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) {
return json.NewSerializer(timestampUnits)
}

func NewSplunkmetricSerializer(timestampUnits time.Duration) (Serializer, error) {
return splunkmetric.NewSerializer(timestampUnits)
}

func NewInfluxSerializerConfig(config *Config) (Serializer, error) {
var sort influx.FieldSortOrder
if config.InfluxSortFields {
Expand Down
85 changes: 85 additions & 0 deletions plugins/serializers/splunkmetric/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Splunk Metrics serialzier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling


This serializer formats and outputs the metric data in a format that can be consumed by a Splunk metrics index. It can be used to write to a file using the file output, or for sending metrics to a HEC using the standard telegraf HTTP output. If you're using the HTTP output, this serializer knows how to batch the metrics so you don't end up with an HTTP POST per metric.

Th data is output in a format that conforms to the specified Splunk HEC JSON format as found here: [Send metrics in JSON format](http://dev.splunk.com/view/event-collector/SP-CAAAFDN).

An example event looks like:
```javascript
{
"time": 1529708430,
"event": "metric",
"host": "patas-mbp",
"fields": {
"_value": 0.6,
"cpu": "cpu0",
"dc": "mobile",
"metric_name": "cpu.usage_user",
"user": "ronnocol"
}
}
```
In the above snippet, the following keys are dimensions:
* cpu
* dc
* user

## Using with the HTTP output

To send this data to a Splunk HEC, you can use the HTTP output, there are some custom headers that you need to add
to manage the HEC authorization, here's a sample config for an HTTP output:

```toml
[[outputs.http]]
# ## URL is the address to send metrics to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, leading # isn't necessary

url = "https://localhost:8088/services/collector"
#
# ## Timeout for HTTP message
# # timeout = "5s"
#
# ## HTTP method, one of: "POST" or "PUT"
# # method = "POST"
#
# ## HTTP Basic Auth credentials
# # username = "username"
# # password = "pa$$word"
#
# ## Optional TLS Config
# # tls_ca = "/etc/telegraf/ca.pem"
# # tls_cert = "/etc/telegraf/cert.pem"
# # tls_key = "/etc/telegraf/key.pem"
# ## Use TLS but skip chain & host verification
# # insecure_skip_verify = false
#
# ## Data format to output.
# ## Each data format has it's own unique set of configuration options, read
# ## more about them here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "splunkmetric"
#
# ## Additional HTTP headers
[outputs.http.headers]
# # Should be set manually to "application/json" for json data_format
Content-Type = "application/json"
Authorization = "Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
X-Splunk-Request-Channel = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
```

## Overrides
You can override the default values for the HEC token you are using by adding additional tags to the config file.

The following aspects of the token can be overriden with tags:
* index
* source

You can either use `[global_tags]` or using a more advanced configuration as documented [here](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md).

Such as this example which overrides the index just on the cpu metric:
```toml
[[inputs.cpu]]
percpu = false
totalcpu = true
[inputs.cpu.tags]
index = "cpu_metrics"
```

135 changes: 135 additions & 0 deletions plugins/serializers/splunkmetric/splunkmetric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package splunkmetric

import (
"encoding/json"
"errors"
"time"

"github.com/influxdata/telegraf"
)

type serializer struct {
TimestampUnits time.Duration
}

func NewSerializer(timestampUnits time.Duration) (*serializer, error) {
s := &serializer{
TimestampUnits: truncateDuration(timestampUnits),
}
return s, nil
}

func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
var serialized string

m, err := s.createObject(metric)
if err == nil {
serialized = m + "\n"
}

return []byte(serialized), nil
}

func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SerializeBatch seems like it's just a reinvention of json.Encoder. It might be better to use the existing standard lib functionality. It should also simplify your code a lot.


var serialized string

var objects []string

for _, metric := range metrics {
m, err := s.createObject(metric)
if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error should be logged somewhere when not nil. Otherwise metrics are going to be getting dropped and the user won't have any idea why.

objects = append(objects, m)
}
}

for _, m := range objects {
serialized = serialized + m + "\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if serialized were a byte slice. Every time you append to a string, a new string has to be allocated. If your batches are large (in terms of count or size), this can suck up a lot of memory. With byte slice, an allocation is performed only if the length of the slice exceeds the capacity.
Ditto for other places where this is stored in a string (e.g. line 23).

Also I'm not seeing much point of the objects slice. You should be able to append to serialized directly.

}

return []byte(serialized), nil
}

func (s *serializer) createObject(metric telegraf.Metric) (metricString string, err error) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do a bad job of explaining what is guaranteed in a telegraf.Metric, and actually as I write this I notice some additional checks I need to add.

Here is a brief rundown of things you may or may not need to check:

  • metric name may be an empty string
  • zero or more tags, tag keys are any non-empty string, tag values may be empty strings
  • zero, yes zero, or more fields, field keys are any non-empty string, field values may be any float64 (including NaN, +/-Inf),int64,uint64,string,bool
  • time is any time.Time.

The part about tag/field keys not being empty strings is not true right now, but after writing this I am going to ensure this is the case in 1.8.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this actually caused me to re-examine the the serializer with several different inputs, and I found a case in which metrics were lost (dropped), so I'm refactoring some of the code to deal with that. (Also, it's been a crazy week...so hope to get this done over the next few days.)

/* Splunk supports one metric per line and has the following required names:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not entirely accurate. Splunk's http event collector is not line-oriented. It's object oriented. You can shove multiple JSON objects on a single line and it's happy to consume them.
See this example: https://docs.splunk.com/Documentation/Splunk/7.1.1/Data/HTTPEventCollectortokenmanagement#Send_multiple_events_to_HEC_in_one_request

{"event": "Pony 1 has left the barn"}{"event": "Pony 2 has left the barn"}{"event": "Pony 3 has left the barn", "nested": {"key1": "value1"}}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it's not line oriented, but it doesn't support reading an array of JSON objects as metrics. This is OK:

{"event": "Pony 1 has left the barn"}{"event": "Pony 2 has left the barn"}{"event": "Pony 3 has left the barn", "nested": {"key1": "value1"}}

This is not OK:

[{"event": "Pony 1 has left the barn"}{"event": "Pony 2 has left the barn"}{"event": "Pony 3 has left the barn", "nested": {"key1": "value1"}}]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point was that developer documentation should be accurate. We shouldn't tell the developers it's "one metric per line" if it's not.

** metric_name: The name of the metric
** _value: The value for the metric
** _time: The timestamp for the metric
** All other index fields become deminsions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't think this is a good structure to follow. This will prevent using this serializer with pretty much every single input plugin, as none of them follow this format. I think the implementation over on #4185 has a much more flexible implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is wrong, it should be time, not _time (I'll fix), but metric_name and _value are required fields for the metrics store. Naming the fields this way means you don't need custom props.conf or transforms.conf. In fact this is the same naming convention that is used in the PR you reference. The point of this serializer is to format the metrics into this format so that it can be used with the generic, well tested, HTTP output. Or the file output if you're running telegraf on a machine that is running a Splunk forwarder.

*/
type HECTimeSeries struct {
Time float64 `json:"time"`
Event string `json:"event"`
Host string `json:"host,omitempty"`
Index string `json:"index,omitempty"`
Source string `json:"source,omitempty"`
Fields map[string]interface{} `json:"fields"`
}

dataGroup := HECTimeSeries{}

for k, v := range metric.Fields() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use metric.FieldList() so that no allocation is performed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of the refactoring mentioned below (thank for the suggestion)


if !verifyValue(v) {
err = errors.New("can not parse value")
return "", err
}

obj := map[string]interface{}{}
obj["metric_name"] = metric.Name() + "." + k
obj["_value"] = v

dataGroup.Event = "metric"
dataGroup.Time = float64(metric.Time().UnixNano() / int64(s.TimestampUnits))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see what you're intending to do here, but this won't result in the correct behavior. https://play.golang.org/p/VEDBzbF0j0e

If you want to do math between 2 ints, and get a float result, you need to convert the int to float before the arithmetic.

dataGroup.Fields = obj

// Break tags out into key(n)=value(t) pairs
for n, t := range metric.Tags() {
if n == "host" {
dataGroup.Host = t
} else if n == "index" {
dataGroup.Index = t
} else if n == "source" {
dataGroup.Source = t
} else {
dataGroup.Fields[n] = t
}
}
dataGroup.Fields["metric_name"] = metric.Name() + "." + k
dataGroup.Fields["_value"] = v
}

metricJson, err := json.Marshal(dataGroup)

if err != nil {
return "", err
}

metricString = string(metricJson)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Picking on the string vs byte slice thing some more, you're converting a byte slice to a string, and then later on converting that string back into a byte slice (up on line 50). It would use less memory to keep it as a byte slice.

return metricString, nil
}

func verifyValue(v interface{}) bool {
switch v.(type) {
case string:
return false
}
return true
}

func truncateDuration(units time.Duration) time.Duration {
// Default precision is 1s
if units <= 0 {
return time.Second
}

// Search for the power of ten less than the duration
d := time.Nanosecond
for {
if d*10 > units {
return d
}
d = d * 10
}
}
108 changes: 108 additions & 0 deletions plugins/serializers/splunkmetric/splunkmetric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package splunkmetric

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)

func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
if err != nil {
panic(err)
}
return v
}

func TestSerializeMetricFloat(t *testing.T) {
now := time.Unix(0, 0)
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer(0)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)
expS := `{"time":0,"event":"metric","fields":{"_value":91.5,"cpu":"cpu0","metric_name":"cpu.usage_idle"}}` + "\n"
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeMetricInt(t *testing.T) {
now := time.Unix(0, 0)
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": int64(90),
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer(0)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)

expS := `{"time":0,"event":"metric","fields":{"_value":90,"cpu":"cpu0","metric_name":"cpu.usage_idle"}}` + "\n"
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeMetricString(t *testing.T) {
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
}
fields := map[string]interface{}{
"usage_idle": "foobar",
}
m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err)

s, _ := NewSerializer(0)
var buf []byte
buf, err = s.Serialize(m)
assert.NoError(t, err)

expS := ""
assert.Equal(t, string(expS), string(buf))
}

func TestSerializeBatch(t *testing.T) {
m := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
)
n := MustMetric(
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 92.0,
},
time.Unix(0, 0),
),
)

metrics := []telegraf.Metric{m, n}
s, _ := NewSerializer(0)
buf, err := s.SerializeBatch(metrics)
assert.NoError(t, err)

expS := `{"time":0,"event":"metric","fields":{"_value":42,"metric_name":"cpu.value"}}` + "\n" + `{"time":0,"event":"metric","fields":{"_value":92,"metric_name":"cpu.value"}}` + "\n"
assert.Equal(t, string(expS), string(buf))
}