Skip to content

Commit

Permalink
Add Cloudwatch output
Browse files Browse the repository at this point in the history
closes #553
  • Loading branch information
stephen-kwong authored and sparrc committed Jan 21, 2016
1 parent f24f5e9 commit e0dc1ef
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- [#551](https://github.com/influxdata/telegraf/pull/551): Statsd UDP read packet size now defaults to 1500 bytes, and is configurable.
- [#552](https://github.com/influxdata/telegraf/pull/552): Support for collection interval jittering.
- [#484](https://github.com/influxdata/telegraf/issues/484): Include usage percent with procstat metrics.
- [#553](https://github.com/influxdata/telegraf/pull/553): Amazon CloudWatch output. thanks @skwong2!

### Bugfixes
- [#506](https://github.com/influxdata/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert!
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,11 @@ want to add support for another service or third-party API.
* influxdb
* amon
* amqp
* aws kinesis
* aws cloudwatch
* datadog
* graphite
* kafka
* amazon kinesis
* librato
* mqtt
* nsq
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package all
import (
_ "github.com/influxdata/telegraf/plugins/outputs/amon"
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
Expand Down
33 changes: 33 additions & 0 deletions plugins/outputs/cloudwatch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
## Amazon CloudWatch Output for Telegraf

This plugin will send points to Amazon CloudWatch.

## Amazon Authentication

This plugin uses a credential chain for Authentication with the CloudWatch
API endpoint. In the following order the plugin will attempt to authenticate.
1. [IAMS Role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
2. [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk)
3. [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk)

## Config

For this output plugin to function correctly the following variables
must be configured.

* region
* namespace

### region

The region is the Amazon region that you wish to connect to.
Examples include but are not limited to:
* us-west-1
* us-west-2
* us-east-1
* ap-southeast-1
* ap-southeast-2

### namespace

The namespace used for AWS CloudWatch metrics.
236 changes: 236 additions & 0 deletions plugins/outputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package cloudwatch

import (
"log"
"math"
"sort"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"

"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/plugins/outputs"
)

type CloudWatch struct {
Region string // AWS Region
Namespace string // CloudWatch Metrics Namespace
svc *cloudwatch.CloudWatch
}

var sampleConfig = `
# Amazon REGION
region = 'us-east-1'
# Namespace for the CloudWatch MetricDatums
namespace = 'InfluxData/Telegraf'
`

func (c *CloudWatch) SampleConfig() string {
return sampleConfig
}

func (c *CloudWatch) Description() string {
return "Configuration for AWS CloudWatch output."
}

func (c *CloudWatch) Connect() error {
Config := &aws.Config{
Region: aws.String(c.Region),
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())},
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
}),
}

svc := cloudwatch.New(session.New(Config))

params := &cloudwatch.ListMetricsInput{
Namespace: aws.String(c.Namespace),
}

_, err := svc.ListMetrics(params) // Try a read-only call to test connection.

if err != nil {
log.Printf("cloudwatch: Error in ListMetrics API call : %+v \n", err.Error())
}

c.svc = svc

return err
}

func (c *CloudWatch) Close() error {
return nil
}

func (c *CloudWatch) Write(points []*client.Point) error {
for _, pt := range points {
err := c.WriteSinglePoint(pt)
if err != nil {
return err
}
}

return nil
}

// Write data for a single point. A point can have many fields and one field
// is equal to one MetricDatum. There is a limit on how many MetricDatums a
// request can have so we process one Point at a time.
func (c *CloudWatch) WriteSinglePoint(point *client.Point) error {
datums := BuildMetricDatum(point)

const maxDatumsPerCall = 20 // PutMetricData only supports up to 20 data points per call

for _, partition := range PartitionDatums(maxDatumsPerCall, datums) {
err := c.WriteToCloudWatch(partition)

if err != nil {
return err
}
}

return nil
}

func (c *CloudWatch) WriteToCloudWatch(datums []*cloudwatch.MetricDatum) error {
params := &cloudwatch.PutMetricDataInput{
MetricData: datums,
Namespace: aws.String(c.Namespace),
}

_, err := c.svc.PutMetricData(params)

if err != nil {
log.Printf("CloudWatch: Unable to write to CloudWatch : %+v \n", err.Error())
}

return err
}

// Partition the MetricDatums into smaller slices of a max size so that are under the limit
// for the AWS API calls.
func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch.MetricDatum {

numberOfPartitions := len(datums) / size
if len(datums)%size != 0 {
numberOfPartitions += 1
}

partitions := make([][]*cloudwatch.MetricDatum, numberOfPartitions)

for i := 0; i < numberOfPartitions; i++ {
start := size * i
end := size * (i + 1)
if end > len(datums) {
end = len(datums)
}

partitions[i] = datums[start:end]
}

return partitions
}

// Make a MetricDatum for each field in a Point. Only fields with values that can be
// converted to float64 are supported. Non-supported fields are skipped.
func BuildMetricDatum(point *client.Point) []*cloudwatch.MetricDatum {
datums := make([]*cloudwatch.MetricDatum, len(point.Fields()))
i := 0

var value float64

for k, v := range point.Fields() {
switch t := v.(type) {
case int:
value = float64(t)
case int32:
value = float64(t)
case int64:
value = float64(t)
case float64:
value = t
case bool:
if t {
value = 1
} else {
value = 0
}
case time.Time:
value = float64(t.Unix())
default:
// Skip unsupported type.
datums = datums[:len(datums)-1]
continue
}

datums[i] = &cloudwatch.MetricDatum{
MetricName: aws.String(strings.Join([]string{point.Name(), k}, "_")),
Value: aws.Float64(value),
Dimensions: BuildDimensions(point.Tags()),
Timestamp: aws.Time(point.Time()),
}

i += 1
}

return datums
}

// Make a list of Dimensions by using a Point's tags. CloudWatch supports up to
// 10 dimensions per metric so we only keep up to the first 10 alphabetically.
// This always includes the "host" tag if it exists.
func BuildDimensions(ptTags map[string]string) []*cloudwatch.Dimension {

const MaxDimensions = 10
dimensions := make([]*cloudwatch.Dimension, int(math.Min(float64(len(ptTags)), MaxDimensions)))

i := 0

// This is pretty ugly but we always want to include the "host" tag if it exists.
if host, ok := ptTags["host"]; ok {
dimensions[i] = &cloudwatch.Dimension{
Name: aws.String("host"),
Value: aws.String(host),
}
i += 1
}

var keys []string
for k := range ptTags {
if k != "host" {
keys = append(keys, k)
}
}
sort.Strings(keys)

for _, k := range keys {
if i >= MaxDimensions {
break
}

dimensions[i] = &cloudwatch.Dimension{
Name: aws.String(k),
Value: aws.String(ptTags[k]),
}

i += 1
}

return dimensions
}

func init() {
outputs.Add("cloudwatch", func() outputs.Output {
return &CloudWatch{}
})
}
88 changes: 88 additions & 0 deletions plugins/outputs/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cloudwatch

import (
"sort"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"

"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/telegraf/testutil"

"github.com/stretchr/testify/assert"
)

// Test that each tag becomes one dimension
func TestBuildDimensions(t *testing.T) {
const MaxDimensions = 10

assert := assert.New(t)

testPoint := testutil.TestPoint(1)
dimensions := BuildDimensions(testPoint.Tags())

tagKeys := make([]string, len(testPoint.Tags()))
i := 0
for k, _ := range testPoint.Tags() {
tagKeys[i] = k
i += 1
}

sort.Strings(tagKeys)

if len(testPoint.Tags()) >= MaxDimensions {
assert.Equal(MaxDimensions, len(dimensions), "Number of dimensions should be less than MaxDimensions")
} else {
assert.Equal(len(testPoint.Tags()), len(dimensions), "Number of dimensions should be equal to number of tags")
}

for i, key := range tagKeys {
if i >= 10 {
break
}
assert.Equal(key, *dimensions[i].Name, "Key should be equal")
assert.Equal(testPoint.Tags()[key], *dimensions[i].Value, "Value should be equal")
}
}

// Test that points with valid values have a MetricDatum created where as non valid do not.
// Skips "time.Time" type as something is converting the value to string.
func TestBuildMetricDatums(t *testing.T) {
assert := assert.New(t)

validPoints := []*client.Point{
testutil.TestPoint(1),
testutil.TestPoint(int32(1)),
testutil.TestPoint(int64(1)),
testutil.TestPoint(float64(1)),
testutil.TestPoint(true),
}

for _, point := range validPoints {
datums := BuildMetricDatum(point)
assert.Equal(1, len(datums), "Valid type should create a Datum")
}

nonValidPoint := testutil.TestPoint("Foo")

assert.Equal(0, len(BuildMetricDatum(nonValidPoint)), "Invalid type should not create a Datum")
}

func TestPartitionDatums(t *testing.T) {

assert := assert.New(t)

testDatum := cloudwatch.MetricDatum{
MetricName: aws.String("Foo"),
Value: aws.Float64(1),
}

oneDatum := []*cloudwatch.MetricDatum{&testDatum}
twoDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum}
threeDatum := []*cloudwatch.MetricDatum{&testDatum, &testDatum, &testDatum}

assert.Equal([][]*cloudwatch.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum))
assert.Equal([][]*cloudwatch.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum))
assert.Equal([][]*cloudwatch.MetricDatum{twoDatum, oneDatum}, PartitionDatums(2, threeDatum))
}

0 comments on commit e0dc1ef

Please sign in to comment.