Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Amon output #350

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions outputs/all/all.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package all

import (
_ "github.com/influxdb/telegraf/outputs/amon"
_ "github.com/influxdb/telegraf/outputs/amqp"
_ "github.com/influxdb/telegraf/outputs/datadog"
_ "github.com/influxdb/telegraf/outputs/influxdb"
Expand Down
9 changes: 9 additions & 0 deletions outputs/amon/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Amon Output Plugin

This plugin writes to [Amon](https://www.amon.cx)
and requires an `serverkey` and `amoninstance` URL which can be obtained [here](https://www.amon.cx/docs/monitoring/)
for the account.

If the point value being sent cannot be converted to a float64, the metric is skipped.

Metrics are grouped by converting any `_` characters to `.` in the Point Name.
148 changes: 148 additions & 0 deletions outputs/amon/amon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package amon

import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"

"github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/duration"
"github.com/influxdb/telegraf/outputs"
)

type Amon struct {
ServerKey string
AmonInstance string
Timeout duration.Duration

client *http.Client
}

var sampleConfig = `
# Amon Server Key
serverkey = "my-server-key" # required.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be server_key


# Amon Insance URL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling

amoninstance = "https://youramoninstance" # required
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amon_instance


# Connection timeout.
# timeout = "5s"
`

type TimeSeries struct {
Series []*Metric `json:"series"`
}

type Metric struct {
Metric string `json:"metric"`
Points [1]Point `json:"points"`
}

type Point [2]float64

func (a *Amon) Connect() error {
if a.ServerKey == "" || a.AmonInstance == "" {
return fmt.Errorf("serverkey and amoninstance are required fields for amon output")
}
a.client = &http.Client{
Timeout: a.Timeout.Duration,
}
return nil
}

func (a *Amon) Write(points []*client.Point) error {
if len(points) == 0 {
return nil
}
ts := TimeSeries{}
var tempSeries = make([]*Metric, len(points))
var acceptablePoints = 0
for _, pt := range points {
metric := &Metric{
Metric: strings.Replace(pt.Name(), "_", ".", -1),
}
if p, err := buildPoint(pt); err == nil {
metric.Points[0] = p
tempSeries[acceptablePoints] = metric
acceptablePoints += 1
} else {
log.Printf("unable to build Metric for %s, skipping\n", pt.Name())
}
}
ts.Series = make([]*Metric, acceptablePoints)
copy(ts.Series, tempSeries[0:])
tsBytes, err := json.Marshal(ts)
if err != nil {
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())
}
req, err := http.NewRequest("POST", a.authenticatedUrl(), bytes.NewBuffer(tsBytes))
if err != nil {
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
}
req.Header.Add("Content-Type", "application/json")

resp, err := a.client.Do(req)
if err != nil {
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}
defer resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode > 209 {
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
}

return nil
}

func (a *Amon) SampleConfig() string {
return sampleConfig
}

func (a *Amon) Description() string {
return "Configuration for Amon Server to send metrics to."
}

func (a *Amon) authenticatedUrl() string {

return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey)
}

func buildPoint(pt *client.Point) (Point, error) {
var p Point
if err := p.setValue(pt.Fields()["value"]); err != nil {
return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
}
p[0] = float64(pt.Time().Unix())
return p, nil
}

func (p *Point) setValue(v interface{}) error {
switch d := v.(type) {
case int:
p[1] = float64(int(d))
case int32:
p[1] = float64(int32(d))
case int64:
p[1] = float64(int64(d))
case float32:
p[1] = float64(d)
case float64:
p[1] = float64(d)
default:
return fmt.Errorf("undeterminable type")
}
return nil
}

func (a *Amon) Close() error {
return nil
}

func init() {
outputs.Add("amon", func() outputs.Output {
return &Amon{}
})
}
163 changes: 163 additions & 0 deletions outputs/amon/amon_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package amon

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/influxdb/telegraf/testutil"

"github.com/influxdb/influxdb/client/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
fakeServerKey = "123456"
fakeAmonInstance = "https://demo.amon.cx"
)

func TestUriOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(`{"status":"ok"}`)
}))
defer ts.Close()

a := &Amon{
ServerKey: fakeServerKey,
AmonInstance: fakeAmonInstance,
}

err := a.Connect()
require.NoError(t, err)
err = a.Write(testutil.MockBatchPoints().Points())
require.NoError(t, err)
}

func TestAuthenticatedUrl(t *testing.T) {
a := &Amon{
ServerKey: fakeServerKey,
AmonInstance: fakeAmonInstance,
}

authUrl := a.authenticatedUrl()
assert.EqualValues(t, fmt.Sprintf("%s/api/system/%s", fakeAmonInstance, fakeServerKey), authUrl)
}

func TestBuildPoint(t *testing.T) {
tags := make(map[string]string)
var tagtests = []struct {
ptIn *client.Point
outPt Point
err error
}{
{
client.NewPoint(
"test1",
tags,
map[string]interface{}{"value": 0.0},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
),
Point{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
0.0,
},
nil,
},
{
client.NewPoint(
"test2",
tags,
map[string]interface{}{"value": 1.0},
time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
),
Point{
float64(time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix()),
1.0,
},
nil,
},
{
client.NewPoint(
"test3",
tags,
map[string]interface{}{"value": 10},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
),
Point{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
10.0,
},
nil,
},
{
client.NewPoint(
"test4",
tags,
map[string]interface{}{"value": int32(112345)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
),
Point{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
112345.0,
},
nil,
},
{
client.NewPoint(
"test5",
tags,
map[string]interface{}{"value": int64(112345)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
),
Point{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
112345.0,
},
nil,
},
{
client.NewPoint(
"test6",
tags,
map[string]interface{}{"value": float32(11234.5)},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
),
Point{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
11234.5,
},
nil,
},
{
client.NewPoint(
"test7",
tags,
map[string]interface{}{"value": "11234.5"},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
),
Point{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
11234.5,
},
fmt.Errorf("unable to extract value from Fields, undeterminable type"),
},
}
for _, tt := range tagtests {
pt, err := buildPoint(tt.ptIn)
if err != nil && tt.err == nil {
t.Errorf("%s: unexpected error, %+v\n", tt.ptIn.Name(), err)
}
if tt.err != nil && err == nil {
t.Errorf("%s: expected an error (%s) but none returned", tt.ptIn.Name(), tt.err.Error())
}
if !reflect.DeepEqual(pt, tt.outPt) && tt.err == nil {
t.Errorf("%s: \nexpected %+v\ngot %+v\n", tt.ptIn.Name(), tt.outPt, pt)
}
}
}