Skip to content

Commit

Permalink
Moved to using the inbuilt serializer. (#1942)
Browse files Browse the repository at this point in the history
* Moved to using the inbuilt serializer.

* Remove Atomic variable as it is not required.

* Adjusted metric type in line with latest changes.
  • Loading branch information
wolfeidau authored and sparrc committed Dec 20, 2016
1 parent 73acd11 commit 829c190
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 61 deletions.
46 changes: 24 additions & 22 deletions plugins/outputs/kinesis/kinesis.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package kinesis

import (
"fmt"
"log"
"os"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -13,6 +11,7 @@ import (
"github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

type KinesisOutput struct {
Expand All @@ -26,9 +25,10 @@ type KinesisOutput struct {

StreamName string `toml:"streamname"`
PartitionKey string `toml:"partitionkey"`
Format string `toml:"format"`
Debug bool `toml:"debug"`
svc *kinesis.Kinesis

serializer serializers.Serializer
}

var sampleConfig = `
Expand All @@ -54,9 +54,13 @@ var sampleConfig = `
streamname = "StreamName"
## PartitionKey as used for sharding data.
partitionkey = "PartitionKey"
## format of the Data payload in the kinesis PutRecord, supported
## String and Custom.
format = "string"
## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
## debug will show upstream aws messages.
debug = false
`
Expand Down Expand Up @@ -125,16 +129,8 @@ func (k *KinesisOutput) Close() error {
return nil
}

func FormatMetric(k *KinesisOutput, point telegraf.Metric) (string, error) {
if k.Format == "string" {
return point.String(), nil
} else {
m := fmt.Sprintf("%+v,%+v,%+v",
point.Name(),
point.Tags(),
point.String())
return m, nil
}
func (k *KinesisOutput) SetSerializer(serializer serializers.Serializer) {
k.serializer = serializer
}

func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Duration {
Expand All @@ -161,31 +157,37 @@ func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Du
}

func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
var sz uint32 = 0
var sz uint32

if len(metrics) == 0 {
return nil
}

r := []*kinesis.PutRecordsRequestEntry{}

for _, p := range metrics {
atomic.AddUint32(&sz, 1)
for _, metric := range metrics {
sz++

values, err := k.serializer.Serialize(metric)
if err != nil {
return err
}

metric, _ := FormatMetric(k, p)
d := kinesis.PutRecordsRequestEntry{
Data: []byte(metric),
Data: values,
PartitionKey: aws.String(k.PartitionKey),
}

r = append(r, &d)

if sz == 500 {
// Max Messages Per PutRecordRequest is 500
elapsed := writekinesis(k, r)
log.Printf("E! Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed)
atomic.StoreUint32(&sz, 0)
sz = 0
r = nil
}

}

writekinesis(k, r)
Expand Down
39 changes: 0 additions & 39 deletions plugins/outputs/kinesis/kinesis_test.go

This file was deleted.

0 comments on commit 829c190

Please sign in to comment.