Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splunk Metrics serializer #4339

Merged
merged 13 commits into from
Sep 11, 2018
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ plugins.
1. [InfluxDB Line Protocol](#influx)
1. [JSON](#json)
1. [Graphite](#graphite)
1. [SplunkMetric](../plugins/serializers/splunkmetric/README.md)

You will be able to identify the plugins with support by the presence of a
`data_format` config option, for example, in the `file` output plugin:
Expand Down
13 changes: 13 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
}
}

if node, ok := tbl.Fields["hec_routing"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if b, ok := kv.Value.(*ast.Boolean); ok {
var err error
c.HecRouting, err = b.Boolean()
if err != nil {
return nil, err
}
}
}
}

delete(tbl.Fields, "influx_max_line_bytes")
delete(tbl.Fields, "influx_sort_fields")
delete(tbl.Fields, "influx_uint_support")
Expand All @@ -1463,6 +1475,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
delete(tbl.Fields, "prefix")
delete(tbl.Fields, "template")
delete(tbl.Fields, "json_timestamp_units")
delete(tbl.Fields, "hec_routing")
return serializers.NewSerializer(c)
}

Expand Down
10 changes: 10 additions & 0 deletions plugins/serializers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/plugins/serializers/splunkmetric"
)

// SerializerOutput is an interface for output plugins that are able to
Expand Down Expand Up @@ -60,6 +61,9 @@ type Config struct {

// Timestamp units to use for JSON formatted output
TimestampUnits time.Duration

// Include HEC routing fields for splunkmetric output
HecRouting bool
}

// NewSerializer a Serializer interface based on the given config.
Expand All @@ -73,6 +77,8 @@ func NewSerializer(config *Config) (Serializer, error) {
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport)
case "json":
serializer, err = NewJsonSerializer(config.TimestampUnits)
case "splunkmetric":
ronnocol marked this conversation as resolved.
Show resolved Hide resolved
serializer, err = NewSplunkmetricSerializer(config.HecRouting)
default:
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
}
Expand All @@ -83,6 +89,10 @@ func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) {
return json.NewSerializer(timestampUnits)
}

func NewSplunkmetricSerializer(hec_routing bool) (Serializer, error) {
return splunkmetric.NewSerializer(hec_routing)
}

func NewInfluxSerializerConfig(config *Config) (Serializer, error) {
var sort influx.FieldSortOrder
if config.InfluxSortFields {
Expand Down
139 changes: 139 additions & 0 deletions plugins/serializers/splunkmetric/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Splunk Metrics serialzier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling


This serializer formats and outputs the metric data in a format that can be consumed by a Splunk metrics index.
It can be used to write to a file using the file output, or for sending metrics to a HEC using the standard telegraf HTTP output.

If you're using the HTTP output, this serializer knows how to batch the metrics so you don't end up with an HTTP POST per metric.

Th data is output in a format that conforms to the specified Splunk HEC JSON format as found here:
[Send metrics in JSON format](http://dev.splunk.com/view/event-collector/SP-CAAAFDN).

An example event looks like:
```javascript
{
"time": 1529708430,
"event": "metric",
"host": "patas-mbp",
"fields": {
"_value": 0.6,
"cpu": "cpu0",
"dc": "mobile",
"metric_name": "cpu.usage_user",
"user": "ronnocol"
}
}
```
In the above snippet, the following keys are dimensions:
* cpu
* dc
* user

## Using with the HTTP output

To send this data to a Splunk HEC, you can use the HTTP output, there are some custom headers that you need to add
to manage the HEC authorization, here's a sample config for an HTTP output:

```toml
[[outputs.http]]
## URL is the address to send metrics to
url = "https://localhost:8088/services/collector"

## Timeout for HTTP message
# timeout = "5s"

## HTTP method, one of: "POST" or "PUT"
# method = "POST"

## HTTP Basic Auth credentials
# username = "username"
# password = "pa$$word"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "splunkmetric"
## Provides time, index, source overrides for the HEC
hec_routing = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename this variable splunk_hec_routing, due to how the parser variables are injected directly into the plugin we have started prefixing the variables. Long term, these will probably become tables and the parsers will become more independent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to keep things consistent, lets call it splunkmetric_hec_routing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will do.


## Additional HTTP headers
[outputs.http.headers]
# Should be set manually to "application/json" for json data_format
Content-Type = "application/json"
Authorization = "Splunk xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
X-Splunk-Request-Channel = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
```

## Overrides
You can override the default values for the HEC token you are using by adding additional tags to the config file.

The following aspects of the token can be overriden with tags:
* index
* source

You can either use `[global_tags]` or using a more advanced configuration as documented [here](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md).

Such as this example which overrides the index just on the cpu metric:
```toml
[[inputs.cpu]]
percpu = false
totalcpu = true
[inputs.cpu.tags]
index = "cpu_metrics"
```

## Using with the File output

You can use the file output when running telegraf on a machine with a Splunk forwarder.

A sample event when `hec_routing` is false (or unset) looks like:
```javascript
{
"_value": 0.6,
"cpu": "cpu0",
"dc": "mobile",
"metric_name": "cpu.usage_user",
"user": "ronnocol",
"time": 1529708430
}
```
Data formatted in this manner can be ingested with a simple `props.conf` file that
looks like this:

```ini
[telegraf]
category = Metrics
description = Telegraf Metrics
pulldown_type = 1
DATETIME_CONFIG =
NO_BINARY_CHECK = true
SHOULD_LINEMERGE = true
disabled = false
INDEXED_EXTRACTIONS = json
KV_MODE = none
TIMESTAMP_FIELDS = time
TIME_FORMAT = %s.%3N
```

An example configuration of a file based output is:

```toml
# Send telegraf metrics to file(s)
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["/tmp/metrics.out"]

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "splunkmetric"
hec_routing = false
```
125 changes: 125 additions & 0 deletions plugins/serializers/splunkmetric/splunkmetric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package splunkmetric

import (
"encoding/json"
"fmt"
"log"

"github.com/influxdata/telegraf"
)

type serializer struct {
HecRouting bool
}

func NewSerializer(hec_routing bool) (*serializer, error) {
s := &serializer{
HecRouting: hec_routing,
}
return s, nil
}

func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {

m, err := s.createObject(metric)
if err != nil {
log.Printf("D! [serializer.splunkmetric] Dropping invalid metric: %v [%v]", metric, m)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return this as an error: return nil, fmt.Errorf("Dropping invalid metric: %s", metric.Name())

return nil, nil
}

return m, nil
}

func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SerializeBatch seems like it's just a reinvention of json.Encoder. It might be better to use the existing standard lib functionality. It should also simplify your code a lot.


var serialized []byte

for _, metric := range metrics {
m, err := s.createObject(metric)
if err != nil {
log.Printf("D! [serializer.splunkmetric] Dropping invalid metric: %v [%v]", metric, m)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should return error. If there are situations where a metric cannot be serialized but it is not an error, have createObject return nil, nil and check if m is nil before appending.

} else {
serialized = append(serialized, m...)
}
}

return serialized, nil
}

func (s *serializer) createObject(metric telegraf.Metric) (metricJson []byte, err error) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do a bad job of explaining what is guaranteed in a telegraf.Metric, and actually as I write this I notice some additional checks I need to add.

Here is a brief rundown of things you may or may not need to check:

  • metric name may be an empty string
  • zero or more tags, tag keys are any non-empty string, tag values may be empty strings
  • zero, yes zero, or more fields, field keys are any non-empty string, field values may be any float64 (including NaN, +/-Inf),int64,uint64,string,bool
  • time is any time.Time.

The part about tag/field keys not being empty strings is not true right now, but after writing this I am going to ensure this is the case in 1.8.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this actually caused me to re-examine the the serializer with several different inputs, and I found a case in which metrics were lost (dropped), so I'm refactoring some of the code to deal with that. (Also, it's been a crazy week...so hope to get this done over the next few days.)

/* Splunk supports one metric json object, and does _not_ support an array of JSON objects.
** Splunk has the following required names for the metric store:
** metric_name: The name of the metric
** _value: The value for the metric
** time: The timestamp for the metric
** All other index fields become deminsions.
*/
type HECTimeSeries struct {
Time float64 `json:"time"`
Event string `json:"event"`
Host string `json:"host,omitempty"`
Index string `json:"index,omitempty"`
Source string `json:"source,omitempty"`
Fields map[string]interface{} `json:"fields"`
}

dataGroup := HECTimeSeries{}

for k, v := range metric.Fields() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use metric.FieldList() so that no allocation is performed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of the refactoring mentioned below (thank for the suggestion)


if !verifyValue(v) {
log.Printf("D! Can not parse value: %v for key: %v", v, k)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we should just continue on the next field. Otherwise we will never be able to serialize a metric with a string field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Will include in the other changes being made.

err := fmt.Errorf("D! Can not parse value: %v for key: %v", v, k)
return nil, err
}

obj := map[string]interface{}{}
obj["metric_name"] = metric.Name() + "." + k
obj["_value"] = v

dataGroup.Event = "metric"
// Convert ns to float seconds since epoch.
dataGroup.Time = float64(metric.Time().UnixNano()) / float64(1000000000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metric.Time().Unix() will give you unix time in seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want seconds, I want ms but as a float (this is Splunk's spec...) e.g. 1536295485.123

dataGroup.Fields = obj

// Break tags out into key(n)=value(t) pairs
for n, t := range metric.Tags() {
if n == "host" {
dataGroup.Host = t
} else if n == "index" {
dataGroup.Index = t
} else if n == "source" {
dataGroup.Source = t
} else {
dataGroup.Fields[n] = t
}
}
dataGroup.Fields["metric_name"] = metric.Name() + "." + k
dataGroup.Fields["_value"] = v
}

switch s.HecRouting {
case true:
// Output the data as a fields array and host,index,time,source overrides for the HEC.
metricJson, err = json.Marshal(dataGroup)
default:
// Just output the data and the time, useful for file based outuputs
dataGroup.Fields["time"] = dataGroup.Time
metricJson, err = json.Marshal(dataGroup.Fields)
}

if err != nil {
return []byte(""), err
}

return metricJson, nil
}

func verifyValue(v interface{}) bool {
switch v.(type) {
case string:
return false
}
return true
}
Loading