diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index a30ab88018eff..8cbdea68292e9 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -1,10 +1,8 @@ package kinesis import ( - "fmt" "log" "os" - "sync/atomic" "time" "github.com/aws/aws-sdk-go/aws" @@ -13,6 +11,7 @@ import ( "github.com/influxdata/telegraf" internalaws "github.com/influxdata/telegraf/internal/config/aws" "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" ) type KinesisOutput struct { @@ -26,9 +25,10 @@ type KinesisOutput struct { StreamName string `toml:"streamname"` PartitionKey string `toml:"partitionkey"` - Format string `toml:"format"` Debug bool `toml:"debug"` svc *kinesis.Kinesis + + serializer serializers.Serializer } var sampleConfig = ` @@ -54,9 +54,13 @@ var sampleConfig = ` streamname = "StreamName" ## PartitionKey as used for sharding data. partitionkey = "PartitionKey" - ## format of the Data payload in the kinesis PutRecord, supported - ## String and Custom. - format = "string" + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" + ## debug will show upstream aws messages. debug = false ` @@ -125,16 +129,8 @@ func (k *KinesisOutput) Close() error { return nil } -func FormatMetric(k *KinesisOutput, point telegraf.Metric) (string, error) { - if k.Format == "string" { - return point.String(), nil - } else { - m := fmt.Sprintf("%+v,%+v,%+v", - point.Name(), - point.Tags(), - point.String()) - return m, nil - } +func (k *KinesisOutput) SetSerializer(serializer serializers.Serializer) { + k.serializer = serializer } func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Duration { @@ -161,7 +157,7 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du } func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { - var sz uint32 = 0 + var sz uint32 if len(metrics) == 0 { return nil @@ -169,23 +165,29 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { r := []*kinesis.PutRecordsRequestEntry{} - for _, p := range metrics { - atomic.AddUint32(&sz, 1) + for _, metric := range metrics { + sz++ + + values, err := k.serializer.Serialize(metric) + if err != nil { + return err + } - metric, _ := FormatMetric(k, p) d := kinesis.PutRecordsRequestEntry{ - Data: []byte(metric), + Data: values, PartitionKey: aws.String(k.PartitionKey), } + r = append(r, &d) if sz == 500 { // Max Messages Per PutRecordRequest is 500 elapsed := writekinesis(k, r) log.Printf("E! Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed) - atomic.StoreUint32(&sz, 0) + sz = 0 r = nil } + } writekinesis(k, r) diff --git a/plugins/outputs/kinesis/kinesis_test.go b/plugins/outputs/kinesis/kinesis_test.go deleted file mode 100644 index de365fa997eae..0000000000000 --- a/plugins/outputs/kinesis/kinesis_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package kinesis - -import ( - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" - "testing" -) - -func TestFormatMetric(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - - k := &KinesisOutput{ - Format: "string", - } - - p := testutil.MockMetrics()[0] - - valid_string := "test1,tag1=value1 value=1 1257894000000000000\n" - func_string, err := FormatMetric(k, p) - - if func_string != valid_string { - t.Error("Expected ", valid_string) - } - require.NoError(t, err) - - k = &KinesisOutput{ - Format: "custom", - } - - valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000\n" - func_custom, err := FormatMetric(k, p) - - if func_custom != valid_custom { - t.Error("Expected ", valid_custom) - } - require.NoError(t, err) -}