Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP output plugin #2491

Merged
merged 20 commits into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/file"
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
_ "github.com/influxdata/telegraf/plugins/outputs/graylog"
_ "github.com/influxdata/telegraf/plugins/outputs/http"
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
_ "github.com/influxdata/telegraf/plugins/outputs/instrumental"
_ "github.com/influxdata/telegraf/plugins/outputs/kafka"
Expand Down
32 changes: 32 additions & 0 deletions plugins/outputs/http/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# HTTP Output Plugin

This plugin writes to a HTTP Server using the `POST Method`.

Data collected from telegraf is sent in the Request Body.

### Configuration:

```toml
# Send telegraf metrics to HTTP Server(s)
[[outputs.http]]
## It requires a url name.
## Will be transmitted telegraf metrics to the HTTP Server using the below URL.
## Note that not support the HTTPS.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add support for the regular HTTPS options once this is merged, but this code will support connecting to a HTTPS url as it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to do additional work on HTTPS after you first merge it? I have not worked on this part.

url = "http://127.0.0.1:8080/metric"
## http_headers option can add a custom header to the request.
## The value is written as a delimiter(:).
## Content-Type is required http header in http plugin.
## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out.
http_headers = [ "Content-Type:application/json" ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do this in the same format as the httpjson input:

[outputs.http]
  [outputs.http.headers]
    Content-Type = "application/json"

This can also be written as an inline table, which is similar to the array version:

[outputs.http]
  headers = {Content-Type = "application/json"}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed to map[string]string.
awesome! :)

## With this HTTP status code, the http plugin checks that the HTTP request is completed normally.
## As a result, any status code that is not a specified status code is considered to be an error condition and processed.
success_status_codes = [ 200, 201, 204 ]
## Configure dial timeout in seconds. Default : 3
timeout = 3

## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
```
183 changes: 183 additions & 0 deletions plugins/outputs/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package http

import (
"bytes"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/json"
"net/http"
"strings"
"time"
)

var sampleConfig = `
## It requires a url name.
## Will be transmitted telegraf metrics to the HTTP Server using the below URL.
## Note that not support the HTTPS.
url = "http://127.0.0.1:8080/metric"
## http_headers option can add a custom header to the request.
## The value is written as a delimiter(:).
## Content-Type is required http header in http plugin.
## so content-type of HTTP specification (plain/text, application/json, etc...) must be filled out.
http_headers = [ "Content-Type:application/json" ]
## With this HTTP status code, the http plugin checks that the HTTP request is completed normally.
## As a result, any status code that is not a specified status code is considered to be an error condition and processed.
success_status_codes = [ 200, 201, 204 ]
## Configure http.Client.Timeout in seconds. Default : 3
timeout = 3

## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"
`

const (
POST = "POST"

DEFAULT_TIME_OUT = 3
)

type Http struct {
// http required option
URL string `toml:"url"`
HttpHeaders []string `toml:"http_headers"`
SuccessStatusCodes []int `toml:"success_status_codes"`

// Option with http default value
Timeout int `toml:"timeout"`

client http.Client
serializer serializers.Serializer
// SuccessStatusCode that stores option values received with expected_status_codes
SuccessStatusCode map[int]bool
}

func (h *Http) SetSerializer(serializer serializers.Serializer) {
h.serializer = serializer
}

// Connect to the Output
func (h *Http) Connect() error {
h.client = http.Client{
Timeout: time.Duration(h.Timeout) * time.Second,
}

return nil
}

// Close is not implemented. Because http.Client not provided connection close policy. Instead, uses the response.Body.Close() pattern.
func (h *Http) Close() error {
return nil
}

// Description A plugin that can transmit metrics over HTTP
func (h *Http) Description() string {
return "A plugin that can transmit metrics over HTTP"
}

// SampleConfig provides sample example for developer
func (h *Http) SampleConfig() string {
return sampleConfig
}

// Writes metrics over HTTP POST
func (h *Http) Write(metrics []telegraf.Metric) error {
var mCount int
var reqBodyBuf []byte

for _, m := range metrics {
buf, err := h.serializer.Serialize(m)

if err != nil {
return fmt.Errorf("E! Error serializing some metrics: %s", err.Error())
}

reqBodyBuf = append(reqBodyBuf, buf...)
mCount++
}

reqBody, err := makeReqBody(h.serializer, reqBodyBuf, mCount)

if err != nil {
return fmt.Errorf("E! Error serialized metric is not assembled : %s", err.Error())
}

if err := h.write(reqBody); err != nil {
return err
}

return nil
}

func (h *Http) write(reqBody []byte) error {
req, err := http.NewRequest(POST, h.URL, bytes.NewBuffer(reqBody))

for _, httpHeader := range h.HttpHeaders {
keyAndValue := strings.Split(httpHeader, ":")
req.Header.Set(keyAndValue[0], keyAndValue[1])
}

resp, err := h.client.Do(req)

if err := h.isOk(resp, err); err != nil {
return err
}

defer resp.Body.Close()

return err
}

func (h *Http) isOk(resp *http.Response, err error) error {
if resp == nil || err != nil {
return fmt.Errorf("E! %s request failed! %s.", h.URL, err.Error())
}

if !h.isExpStatusCode(resp.StatusCode) {
return fmt.Errorf("E! %s response is unexpected status code : %d.", h.URL, resp.StatusCode)
}

return nil
}

func (h *Http) isExpStatusCode(resStatusCode int) bool {
if h.SuccessStatusCode == nil {
h.SuccessStatusCode = make(map[int]bool)

for _, expectedStatusCode := range h.SuccessStatusCodes {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that any 2xx status code should be considered accepted. https://en.wikipedia.org/wiki/List_of_HTTP_status_codes#2xx_Success

This is also how the Datadog plugin works: https://github.com/influxdata/telegraf/blob/master/plugins/outputs/datadog/datadog.go#L123

If agreed, then this could be simplified as follows:

if resp.StatusCode < 200 || resp.StatusCode > 209 {
    return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good idea.
I agree that.
I'll fix the code.

h.SuccessStatusCode[expectedStatusCode] = true
}
}

if h.SuccessStatusCode[resStatusCode] {
return true
}

return false
}

// makeReqBody translates each serializer's converted metric into a request body.
func makeReqBody(serializer serializers.Serializer, reqBodyBuf []byte, mCount int) ([]byte, error) {
switch serializer.(type) {
case *json.JsonSerializer:
var arrayJsonObj []byte
arrayJsonObj = append(arrayJsonObj, []byte("[")...)
arrayJsonObj = append(arrayJsonObj, reqBodyBuf...)
arrayJsonObj = append(arrayJsonObj, []byte("]")...)
return bytes.Replace(arrayJsonObj, []byte("\n"), []byte(","), mCount-1), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why you did this since the Serializer interface does not provide a way to process a list of metrics, but I don't think it is robust enough. I will update the Serializer interface to take a []telegraf.Metrics and then we can do this in a better way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a code that processes each individual metric using a serializer and creates a Body according to the Content-Type. This section has been modified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see 92 line about serializer.

default:
return reqBodyBuf, nil
}
}

func init() {
outputs.Add("http", func() telegraf.Output {
return &Http{
Timeout: DEFAULT_TIME_OUT,
}
})
}
Loading