From 77cec5af9b0af20a7defae43778c451d1f53c945 Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Fri, 20 Jan 2017 19:03:38 +0100 Subject: [PATCH] Add HTTP helper for Metricsets (#3413) This should simplify the implementation of MetricSets based on HTTP. --- metricbeat/helper/http.go | 85 +++++++++++++++++++ metricbeat/module/apache/status/data.go | 4 +- metricbeat/module/apache/status/status.go | 28 ++---- metricbeat/module/couchbase/bucket/bucket.go | 39 ++------- metricbeat/module/couchbase/bucket/data.go | 5 +- .../module/couchbase/cluster/cluster.go | 38 ++------- metricbeat/module/couchbase/cluster/data.go | 5 +- metricbeat/module/couchbase/node/data.go | 8 +- metricbeat/module/couchbase/node/node.go | 37 ++------ metricbeat/module/nginx/stubstatus/data.go | 8 +- .../module/nginx/stubstatus/stubstatus.go | 28 ++---- .../module/prometheus/collector/collector.go | 26 ++---- metricbeat/module/prometheus/stats/stats.go | 25 ++---- 13 files changed, 139 insertions(+), 197 deletions(-) create mode 100644 metricbeat/helper/http.go diff --git a/metricbeat/helper/http.go b/metricbeat/helper/http.go new file mode 100644 index 00000000000..eb01376d2c1 --- /dev/null +++ b/metricbeat/helper/http.go @@ -0,0 +1,85 @@ +package helper + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + "github.com/elastic/beats/metricbeat/mb" +) + +type HTTP struct { + base mb.BaseMetricSet + client *http.Client // HTTP client that is reused across requests. +} + +// NewHTTP creates new http helper +func NewHTTP(base mb.BaseMetricSet) *HTTP { + return &HTTP{ + base: base, + client: &http.Client{Timeout: base.Module().Config().Timeout}, + } +} + +// FetchResponse fetches a response for the http metricset. +// It's important that resp.Body has to be closed if this method is used. Before using this method +// check if one of the other Fetch* methods could be used as they ensure that the Body is properly closed. +func (h *HTTP) FetchResponse() (*http.Response, error) { + req, err := http.NewRequest("GET", h.base.HostData().SanitizedURI, nil) + if h.base.HostData().User != "" || h.base.HostData().Password != "" { + req.SetBasicAuth(h.base.HostData().User, h.base.HostData().Password) + } + resp, err := h.client.Do(req) + if err != nil { + return nil, fmt.Errorf("error making http request: %v", err) + } + + return resp, nil +} + +// FetchContent makes an HTTP request to the configured url and returns the body content. +func (h *HTTP) FetchContent() ([]byte, error) { + resp, err := h.FetchResponse() + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("HTTP error %d in %s: %s", resp.StatusCode, h.base.Name(), resp.Status) + } + + return ioutil.ReadAll(resp.Body) +} + +// FetchScanner returns a Scanner for the content. +func (h *HTTP) FetchScanner() (*bufio.Scanner, error) { + content, err := h.FetchContent() + if err != nil { + return nil, err + } + + return bufio.NewScanner(bytes.NewReader(content)), nil +} + +// FetchJSON makes an HTTP request to the configured url and returns the JSON content. +// This only works if the JSON output needed is in map[string]interface format. +func (h *HTTP) FetchJSON() (map[string]interface{}, error) { + + body, err := h.FetchContent() + if err != nil { + return nil, err + } + + var data map[string]interface{} + + err = json.Unmarshal(body, &data) + if err != nil { + return nil, err + } + + return data, nil +} diff --git a/metricbeat/module/apache/status/data.go b/metricbeat/module/apache/status/data.go index 05d5712bfa2..72fd73e9c8e 100644 --- a/metricbeat/module/apache/status/data.go +++ b/metricbeat/module/apache/status/data.go @@ -2,7 +2,6 @@ package status import ( "bufio" - "io" "regexp" "strings" @@ -55,7 +54,7 @@ var ( ) // Map body to MapStr -func eventMapping(body io.ReadCloser, hostname string) common.MapStr { +func eventMapping(scanner *bufio.Scanner, hostname string) common.MapStr { var ( totalS int totalR int @@ -72,7 +71,6 @@ func eventMapping(body io.ReadCloser, hostname string) common.MapStr { ) fullEvent := map[string]interface{}{} - scanner := bufio.NewScanner(body) // Iterate through all events to gather data for scanner.Scan() { diff --git a/metricbeat/module/apache/status/status.go b/metricbeat/module/apache/status/status.go index 6bba5e5fedd..c11fd8688e0 100644 --- a/metricbeat/module/apache/status/status.go +++ b/metricbeat/module/apache/status/status.go @@ -2,11 +2,9 @@ package status import ( - "fmt" - "net/http" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -45,33 +43,23 @@ func init() { // MetricSet for fetching Apache HTTPD server status. type MetricSet struct { mb.BaseMetricSet - client *http.Client // HTTP client that is reused across requests. + http *helper.HTTP } // New creates new instance of MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ - BaseMetricSet: base, - client: &http.Client{Timeout: base.Module().Config().Timeout}, + base, + helper.NewHTTP(base), }, nil } -// Fetch makes an HTTP request to fetch status metrics from the mod_status -// endpoint. +// Fetch makes an HTTP request to fetch status metrics from the mod_status endpoint. func (m *MetricSet) Fetch() (common.MapStr, error) { - req, err := http.NewRequest("GET", m.HostData().SanitizedURI, nil) - if m.HostData().User != "" || m.HostData().Password != "" { - req.SetBasicAuth(m.HostData().User, m.HostData().Password) - } - resp, err := m.client.Do(req) + scanner, err := m.http.FetchScanner() if err != nil { - return nil, fmt.Errorf("error making http request: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != 200 { - return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status) + return nil, err } - return eventMapping(resp.Body, m.Host()), nil + return eventMapping(scanner, m.Host()), nil } diff --git a/metricbeat/module/couchbase/bucket/bucket.go b/metricbeat/module/couchbase/bucket/bucket.go index 6d95b315948..e0ac9d4af50 100644 --- a/metricbeat/module/couchbase/bucket/bucket.go +++ b/metricbeat/module/couchbase/bucket/bucket.go @@ -1,11 +1,9 @@ package bucket import ( - "fmt" - "net/http" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -31,29 +29,18 @@ func init() { } // MetricSet type defines all fields of the MetricSet -// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with -// additional entries. These variables can be used to persist data or configuration between -// multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - client *http.Client + http *helper.HTTP } // New create a new instance of the MetricSet -// Part of new is also setting up the configuration by processing additional -// configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { logp.Warn("EXPERIMENTAL: The couchbase bucket metricset is experimental") - config := struct{}{} - - if err := base.Module().UnpackConfig(&config); err != nil { - return nil, err - } - return &MetricSet{ BaseMetricSet: base, - client: &http.Client{Timeout: base.Module().Config().Timeout}, + http: helper.NewHTTP(base), }, nil } @@ -61,23 +48,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch() ([]common.MapStr, error) { - req, err := http.NewRequest("GET", m.HostData().SanitizedURI, nil) - if m.HostData().User != "" || m.HostData().Password != "" { - req.SetBasicAuth(m.HostData().User, m.HostData().Password) - } - - resp, err := m.client.Do(req) - + content, err := m.http.FetchContent() if err != nil { - return nil, fmt.Errorf("error making http request: %v", err) - } - - defer resp.Body.Close() - - if resp.StatusCode != 200 { - return nil, fmt.Errorf("Error Connecting to Couchbase %d: %s", resp.StatusCode, resp.Status) + return nil, err } - - return eventsMapping(resp.Body), nil - + return eventsMapping(content), nil } diff --git a/metricbeat/module/couchbase/bucket/data.go b/metricbeat/module/couchbase/bucket/data.go index e026d0b59df..b2b1f1920a3 100644 --- a/metricbeat/module/couchbase/bucket/data.go +++ b/metricbeat/module/couchbase/bucket/data.go @@ -2,7 +2,6 @@ package bucket import ( "encoding/json" - "io" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -30,10 +29,10 @@ type Buckets []struct { BasicStats BucketBasicStats `json:"basicStats"` } -func eventsMapping(body io.Reader) []common.MapStr { +func eventsMapping(content []byte) []common.MapStr { var d Buckets - err := json.NewDecoder(body).Decode(&d) + err := json.Unmarshal(content, &d) if err != nil { logp.Err("Error: ", err) } diff --git a/metricbeat/module/couchbase/cluster/cluster.go b/metricbeat/module/couchbase/cluster/cluster.go index f26e0420a39..9de12620531 100644 --- a/metricbeat/module/couchbase/cluster/cluster.go +++ b/metricbeat/module/couchbase/cluster/cluster.go @@ -1,11 +1,9 @@ package cluster import ( - "fmt" - "net/http" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -31,29 +29,18 @@ func init() { } // MetricSet type defines all fields of the MetricSet -// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with -// additional entries. These variables can be used to persist data or configuration between -// multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - client *http.Client + http *helper.HTTP } // New create a new instance of the MetricSet -// Part of new is also setting up the configuration by processing additional -// configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { logp.Warn("EXPERIMENTAL: The couchbase cluster metricset is experimental") - config := struct{}{} - - if err := base.Module().UnpackConfig(&config); err != nil { - return nil, err - } - return &MetricSet{ BaseMetricSet: base, - client: &http.Client{Timeout: base.Module().Config().Timeout}, + http: helper.NewHTTP(base), }, nil } @@ -61,22 +48,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch() (common.MapStr, error) { - req, err := http.NewRequest("GET", m.HostData().SanitizedURI, nil) - if m.HostData().User != "" || m.HostData().Password != "" { - req.SetBasicAuth(m.HostData().User, m.HostData().Password) - } - - resp, err := m.client.Do(req) - + content, err := m.http.FetchContent() if err != nil { - return nil, fmt.Errorf("error making http request: %v", err) - } - - defer resp.Body.Close() - - if resp.StatusCode != 200 { - return nil, fmt.Errorf("Error Connecting to Couchbase %d: %s", resp.StatusCode, resp.Status) + return nil, err } - - return eventMapping(resp.Body), nil + return eventMapping(content), nil } diff --git a/metricbeat/module/couchbase/cluster/data.go b/metricbeat/module/couchbase/cluster/data.go index 9d885e7f8c9..a9d3e0dc6a8 100644 --- a/metricbeat/module/couchbase/cluster/data.go +++ b/metricbeat/module/couchbase/cluster/data.go @@ -2,7 +2,6 @@ package cluster import ( "encoding/json" - "io" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -41,10 +40,10 @@ type Data struct { MaxBucketCount int64 `json:"maxBucketCount"` } -func eventMapping(body io.Reader) common.MapStr { +func eventMapping(content []byte) common.MapStr { var d Data - err := json.NewDecoder(body).Decode(&d) + err := json.Unmarshal(content, &d) if err != nil { logp.Err("Error: ", err) } diff --git a/metricbeat/module/couchbase/node/data.go b/metricbeat/module/couchbase/node/data.go index bd27bbb49c8..c1d34e165ce 100644 --- a/metricbeat/module/couchbase/node/data.go +++ b/metricbeat/module/couchbase/node/data.go @@ -2,11 +2,11 @@ package node import ( "encoding/json" - "io" + + "strconv" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "strconv" ) type NodeSystemStats struct { @@ -56,10 +56,10 @@ type Data struct { Nodes []Node `json:"nodes"` } -func eventsMapping(body io.Reader) []common.MapStr { +func eventsMapping(content []byte) []common.MapStr { var d Data - err := json.NewDecoder(body).Decode(&d) + err := json.Unmarshal(content, &d) if err != nil { logp.Err("Error: ", err) } diff --git a/metricbeat/module/couchbase/node/node.go b/metricbeat/module/couchbase/node/node.go index b88d2ba0bed..fcef19ffc34 100644 --- a/metricbeat/module/couchbase/node/node.go +++ b/metricbeat/module/couchbase/node/node.go @@ -1,11 +1,9 @@ package node import ( - "fmt" - "net/http" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -31,29 +29,18 @@ func init() { } // MetricSet type defines all fields of the MetricSet -// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with -// additional entries. These variables can be used to persist data or configuration between -// multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - client *http.Client + http *helper.HTTP } // New create a new instance of the MetricSet -// Part of new is also setting up the configuration by processing additional -// configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { logp.Warn("EXPERIMENTAL: The couchbase node metricset is experimental") - config := struct{}{} - - if err := base.Module().UnpackConfig(&config); err != nil { - return nil, err - } - return &MetricSet{ BaseMetricSet: base, - client: &http.Client{Timeout: base.Module().Config().Timeout}, + http: helper.NewHTTP(base), }, nil } @@ -61,22 +48,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch() ([]common.MapStr, error) { - req, err := http.NewRequest("GET", m.HostData().SanitizedURI, nil) - if m.HostData().User != "" || m.HostData().Password != "" { - req.SetBasicAuth(m.HostData().User, m.HostData().Password) - } - - resp, err := m.client.Do(req) - + content, err := m.http.FetchContent() if err != nil { - return nil, fmt.Errorf("error making http request: %v", err) - } - - defer resp.Body.Close() - - if resp.StatusCode != 200 { - return nil, fmt.Errorf("Error Connecting to Couchbase %d: %s", resp.StatusCode, resp.Status) + return nil, err } - return eventsMapping(resp.Body), nil + return eventsMapping(content), nil } diff --git a/metricbeat/module/nginx/stubstatus/data.go b/metricbeat/module/nginx/stubstatus/data.go index f30ad0f9fc4..7f4ffd92f0b 100755 --- a/metricbeat/module/nginx/stubstatus/data.go +++ b/metricbeat/module/nginx/stubstatus/data.go @@ -3,7 +3,6 @@ package stubstatus import ( "bufio" "fmt" - "io" "regexp" "strconv" @@ -17,7 +16,7 @@ var ( ) // Map body to MapStr -func eventMapping(m *MetricSet, body io.ReadCloser, hostname string, metricset string) (common.MapStr, error) { +func eventMapping(scanner *bufio.Scanner, m *MetricSet) (common.MapStr, error) { // Nginx stub status sample: // Active connections: 1 // server accepts handled requests @@ -35,8 +34,6 @@ func eventMapping(m *MetricSet, body io.ReadCloser, hostname string, metricset s waiting int ) - scanner := bufio.NewScanner(body) - // Parse active connections. scanner.Scan() matches := activeRe.FindStringSubmatch(scanner.Text()) @@ -79,8 +76,7 @@ func eventMapping(m *MetricSet, body io.ReadCloser, hostname string, metricset s waiting, _ = strconv.Atoi(matches[3]) event := common.MapStr{ - "hostname": hostname, - + "hostname": m.Host(), "active": active, "accepts": accepts, "handled": handled, diff --git a/metricbeat/module/nginx/stubstatus/stubstatus.go b/metricbeat/module/nginx/stubstatus/stubstatus.go index 97764379403..3bf30b4efb6 100755 --- a/metricbeat/module/nginx/stubstatus/stubstatus.go +++ b/metricbeat/module/nginx/stubstatus/stubstatus.go @@ -2,11 +2,8 @@ package stubstatus import ( - "fmt" - "net/http" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -21,8 +18,6 @@ const ( ) var ( - debugf = logp.MakeDebug("nginx-status") - hostParser = parse.URLHostParserBuilder{ DefaultScheme: defaultScheme, PathConfigKey: "server_status_path", @@ -39,33 +34,24 @@ func init() { // MetricSet for fetching Nginx stub status. type MetricSet struct { mb.BaseMetricSet - client *http.Client // HTTP client that is reused across requests. - previousNumRequests int // Total number of requests as returned in the previous fetch. + http *helper.HTTP + previousNumRequests int // Total number of requests as returned in the previous fetch. } // New creates new instance of MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, - client: &http.Client{Timeout: base.Module().Config().Timeout}, + http: helper.NewHTTP(base), }, nil } // Fetch makes an HTTP request to fetch status metrics from the stubstatus endpoint. func (m *MetricSet) Fetch() (common.MapStr, error) { - req, err := http.NewRequest("GET", m.HostData().SanitizedURI, nil) - if m.HostData().User != "" || m.HostData().Password != "" { - req.SetBasicAuth(m.HostData().User, m.HostData().Password) - } - resp, err := m.client.Do(req) + scanner, err := m.http.FetchScanner() if err != nil { - return nil, fmt.Errorf("error making http request: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != 200 { - return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status) + return nil, err } - return eventMapping(m, resp.Body, m.Host(), m.Name()) + return eventMapping(scanner, m) } diff --git a/metricbeat/module/prometheus/collector/collector.go b/metricbeat/module/prometheus/collector/collector.go index ac7cc8e3786..44866382c57 100644 --- a/metricbeat/module/prometheus/collector/collector.go +++ b/metricbeat/module/prometheus/collector/collector.go @@ -1,12 +1,9 @@ package collector import ( - "bufio" - "fmt" - "net/http" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -17,8 +14,6 @@ const ( ) var ( - debugf = logp.MakeDebug("prometheus-collector") - hostParser = parse.URLHostParserBuilder{ DefaultScheme: defaultScheme, DefaultPath: defaultPath, @@ -34,7 +29,7 @@ func init() { type MetricSet struct { mb.BaseMetricSet - client *http.Client + http *helper.HTTP namespace string } @@ -51,29 +46,18 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, - client: &http.Client{Timeout: base.Module().Config().Timeout}, + http: helper.NewHTTP(base), namespace: config.Namespace, }, nil } func (m *MetricSet) Fetch() ([]common.MapStr, error) { - req, err := http.NewRequest("GET", m.HostData().SanitizedURI, nil) - if m.HostData().User != "" || m.HostData().Password != "" { - req.SetBasicAuth(m.HostData().User, m.HostData().Password) - } - resp, err := m.client.Do(req) + scanner, err := m.http.FetchScanner() if err != nil { - return nil, fmt.Errorf("error making http request: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != 200 { - return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status) + return nil, err } - eventList := map[string]common.MapStr{} - scanner := bufio.NewScanner(resp.Body) // Iterate through all events to gather data for scanner.Scan() { diff --git a/metricbeat/module/prometheus/stats/stats.go b/metricbeat/module/prometheus/stats/stats.go index 1429f6c76ba..661eae1df8b 100644 --- a/metricbeat/module/prometheus/stats/stats.go +++ b/metricbeat/module/prometheus/stats/stats.go @@ -1,13 +1,11 @@ package stats import ( - "bufio" - "fmt" - "net/http" "strings" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" ) @@ -18,8 +16,6 @@ const ( ) var ( - debugf = logp.MakeDebug("prometheus-stats") - hostParser = parse.URLHostParserBuilder{ DefaultScheme: defaultScheme, DefaultPath: defaultPath, @@ -34,7 +30,7 @@ func init() { type MetricSet struct { mb.BaseMetricSet - client *http.Client + http *helper.HTTP } func New(base mb.BaseMetricSet) (mb.MetricSet, error) { @@ -42,27 +38,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, - client: &http.Client{Timeout: base.Module().Config().Timeout}, + http: helper.NewHTTP(base), }, nil } func (m *MetricSet) Fetch() (common.MapStr, error) { - req, err := http.NewRequest("GET", m.HostData().SanitizedURI, nil) - if m.HostData().User != "" || m.HostData().Password != "" { - req.SetBasicAuth(m.HostData().User, m.HostData().Password) - } - resp, err := m.client.Do(req) + scanner, err := m.http.FetchScanner() if err != nil { - return nil, fmt.Errorf("error making http request: %v", err) + return nil, err } - defer resp.Body.Close() - - if resp.StatusCode != 200 { - return nil, fmt.Errorf("HTTP error %d: %s", resp.StatusCode, resp.Status) - } - - scanner := bufio.NewScanner(resp.Body) entries := map[string]interface{}{}