Skip to content

Commit

Permalink
Add HTTP helper for Metricsets (#3413)
Browse files Browse the repository at this point in the history
This should simplify the implementation of MetricSets based on HTTP.
  • Loading branch information
ruflin authored and andrewkroh committed Jan 20, 2017
1 parent 0a8ca7d commit 77cec5a
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 197 deletions.
85 changes: 85 additions & 0 deletions metricbeat/helper/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package helper

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/elastic/beats/metricbeat/mb"
)

type HTTP struct {
base mb.BaseMetricSet
client *http.Client // HTTP client that is reused across requests.
}

// NewHTTP creates new http helper
func NewHTTP(base mb.BaseMetricSet) *HTTP {
return &HTTP{
base: base,
client: &http.Client{Timeout: base.Module().Config().Timeout},
}
}

// FetchResponse fetches a response for the http metricset.
// It's important that resp.Body has to be closed if this method is used. Before using this method
// check if one of the other Fetch* methods could be used as they ensure that the Body is properly closed.
func (h *HTTP) FetchResponse() (*http.Response, error) {
req, err := http.NewRequest("GET", h.base.HostData().SanitizedURI, nil)
if h.base.HostData().User != "" || h.base.HostData().Password != "" {
req.SetBasicAuth(h.base.HostData().User, h.base.HostData().Password)
}
resp, err := h.client.Do(req)
if err != nil {
return nil, fmt.Errorf("error making http request: %v", err)
}

return resp, nil
}

// FetchContent makes an HTTP request to the configured url and returns the body content.
func (h *HTTP) FetchContent() ([]byte, error) {
resp, err := h.FetchResponse()
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return nil, fmt.Errorf("HTTP error %d in %s: %s", resp.StatusCode, h.base.Name(), resp.Status)
}

return ioutil.ReadAll(resp.Body)
}

// FetchScanner returns a Scanner for the content.
func (h *HTTP) FetchScanner() (*bufio.Scanner, error) {
content, err := h.FetchContent()
if err != nil {
return nil, err
}

return bufio.NewScanner(bytes.NewReader(content)), nil
}

// FetchJSON makes an HTTP request to the configured url and returns the JSON content.
// This only works if the JSON output needed is in map[string]interface format.
func (h *HTTP) FetchJSON() (map[string]interface{}, error) {

body, err := h.FetchContent()
if err != nil {
return nil, err
}

var data map[string]interface{}

err = json.Unmarshal(body, &data)
if err != nil {
return nil, err
}

return data, nil
}
4 changes: 1 addition & 3 deletions metricbeat/module/apache/status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package status

import (
"bufio"
"io"
"regexp"
"strings"

Expand Down Expand Up @@ -55,7 +54,7 @@ var (
)

// Map body to MapStr
func eventMapping(body io.ReadCloser, hostname string) common.MapStr {
func eventMapping(scanner *bufio.Scanner, hostname string) common.MapStr {
var (
totalS int
totalR int
Expand All @@ -72,7 +71,6 @@ func eventMapping(body io.ReadCloser, hostname string) common.MapStr {
)

fullEvent := map[string]interface{}{}
scanner := bufio.NewScanner(body)

// Iterate through all events to gather data
for scanner.Scan() {
Expand Down
28 changes: 8 additions & 20 deletions metricbeat/module/apache/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
package status

import (
"fmt"
"net/http"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
)
Expand Down Expand Up @@ -45,33 +43,23 @@ func init() {
// MetricSet for fetching Apache HTTPD server status.
type MetricSet struct {
mb.BaseMetricSet
client *http.Client // HTTP client that is reused across requests.
http *helper.HTTP
}

// New creates new instance of MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
client: &http.Client{Timeout: base.Module().Config().Timeout},
base,
helper.NewHTTP(base),
}, nil
}

// Fetch makes an HTTP request to fetch status metrics from the mod_status
// endpoint.
// Fetch makes an HTTP request to fetch status metrics from the mod_status endpoint.
func (m *MetricSet) Fetch() (common.MapStr, error) {
req, err := http.NewRequest("GET", m.HostData().SanitizedURI, nil)
if m.HostData().User != "" || m.HostData().Password != "" {
req.SetBasicAuth(m.HostData().User, m.HostData().Password)
}
resp, err := m.client.Do(req)
scanner, err := m.http.FetchScanner()
if err != nil {
return nil, fmt.Errorf("error making http request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status)
return nil, err
}

return eventMapping(resp.Body, m.Host()), nil
return eventMapping(scanner, m.Host()), nil
}
39 changes: 6 additions & 33 deletions metricbeat/module/couchbase/bucket/bucket.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package bucket

import (
"fmt"
"net/http"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
)
Expand All @@ -31,53 +29,28 @@ func init() {
}

// MetricSet type defines all fields of the MetricSet
// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with
// additional entries. These variables can be used to persist data or configuration between
// multiple fetch calls.
type MetricSet struct {
mb.BaseMetricSet
client *http.Client
http *helper.HTTP
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
logp.Warn("EXPERIMENTAL: The couchbase bucket metricset is experimental")

config := struct{}{}

if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}

return &MetricSet{
BaseMetricSet: base,
client: &http.Client{Timeout: base.Module().Config().Timeout},
http: helper.NewHTTP(base),
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
req, err := http.NewRequest("GET", m.HostData().SanitizedURI, nil)
if m.HostData().User != "" || m.HostData().Password != "" {
req.SetBasicAuth(m.HostData().User, m.HostData().Password)
}

resp, err := m.client.Do(req)

content, err := m.http.FetchContent()
if err != nil {
return nil, fmt.Errorf("error making http request: %v", err)
}

defer resp.Body.Close()

if resp.StatusCode != 200 {
return nil, fmt.Errorf("Error Connecting to Couchbase %d: %s", resp.StatusCode, resp.Status)
return nil, err
}

return eventsMapping(resp.Body), nil

return eventsMapping(content), nil
}
5 changes: 2 additions & 3 deletions metricbeat/module/couchbase/bucket/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bucket

import (
"encoding/json"
"io"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -30,10 +29,10 @@ type Buckets []struct {
BasicStats BucketBasicStats `json:"basicStats"`
}

func eventsMapping(body io.Reader) []common.MapStr {
func eventsMapping(content []byte) []common.MapStr {

var d Buckets
err := json.NewDecoder(body).Decode(&d)
err := json.Unmarshal(content, &d)
if err != nil {
logp.Err("Error: ", err)
}
Expand Down
38 changes: 6 additions & 32 deletions metricbeat/module/couchbase/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package cluster

import (
"fmt"
"net/http"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
)
Expand All @@ -31,52 +29,28 @@ func init() {
}

// MetricSet type defines all fields of the MetricSet
// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with
// additional entries. These variables can be used to persist data or configuration between
// multiple fetch calls.
type MetricSet struct {
mb.BaseMetricSet
client *http.Client
http *helper.HTTP
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
logp.Warn("EXPERIMENTAL: The couchbase cluster metricset is experimental")

config := struct{}{}

if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}

return &MetricSet{
BaseMetricSet: base,
client: &http.Client{Timeout: base.Module().Config().Timeout},
http: helper.NewHTTP(base),
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch() (common.MapStr, error) {
req, err := http.NewRequest("GET", m.HostData().SanitizedURI, nil)
if m.HostData().User != "" || m.HostData().Password != "" {
req.SetBasicAuth(m.HostData().User, m.HostData().Password)
}

resp, err := m.client.Do(req)

content, err := m.http.FetchContent()
if err != nil {
return nil, fmt.Errorf("error making http request: %v", err)
}

defer resp.Body.Close()

if resp.StatusCode != 200 {
return nil, fmt.Errorf("Error Connecting to Couchbase %d: %s", resp.StatusCode, resp.Status)
return nil, err
}

return eventMapping(resp.Body), nil
return eventMapping(content), nil
}
5 changes: 2 additions & 3 deletions metricbeat/module/couchbase/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cluster

import (
"encoding/json"
"io"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -41,10 +40,10 @@ type Data struct {
MaxBucketCount int64 `json:"maxBucketCount"`
}

func eventMapping(body io.Reader) common.MapStr {
func eventMapping(content []byte) common.MapStr {

var d Data
err := json.NewDecoder(body).Decode(&d)
err := json.Unmarshal(content, &d)
if err != nil {
logp.Err("Error: ", err)
}
Expand Down
8 changes: 4 additions & 4 deletions metricbeat/module/couchbase/node/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package node

import (
"encoding/json"
"io"

"strconv"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"strconv"
)

type NodeSystemStats struct {
Expand Down Expand Up @@ -56,10 +56,10 @@ type Data struct {
Nodes []Node `json:"nodes"`
}

func eventsMapping(body io.Reader) []common.MapStr {
func eventsMapping(content []byte) []common.MapStr {

var d Data
err := json.NewDecoder(body).Decode(&d)
err := json.Unmarshal(content, &d)
if err != nil {
logp.Err("Error: ", err)
}
Expand Down
Loading

0 comments on commit 77cec5a

Please sign in to comment.