Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes input plugin PodLabels #6764

Merged
merged 6 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion plugins/inputs/kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Kubernetes Input Plugin

This input plugin talks to the kubelet api using the `/stats/summary` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet.
This input plugin talks to the kubelet api using the `/stats/summary` and `/pods` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet.

To find the ip address of the host you are running on you can issue a command like the following:

Expand Down Expand Up @@ -44,6 +44,11 @@ avoid cardinality issues:
## OR
# bearer_token_string = "abc_123"

# Labels to include and exclude
# An empty array for include and exclude will include all labels
#label_include = []
#label_exclude = []

## Set response_timeout (default 5 seconds)
# response_timeout = "5s"

Expand Down
132 changes: 97 additions & 35 deletions plugins/inputs/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"encoding/json"
"fmt"
"github.com/influxdata/telegraf/filter"
"io/ioutil"
"net/http"
"net/url"
Expand All @@ -23,6 +24,11 @@ type Kubernetes struct {
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`

LabelInclude []string `toml:"label_include"`
LabelExclude []string `toml:"label_exclude"`

labelFilter filter.Filter

// HTTP Timeout specified as a string - 3s, 1m, 1h
ResponseTimeout internal.Duration

Expand All @@ -42,6 +48,11 @@ var sampleConfig = `
## OR
# bearer_token_string = "abc_123"

# Labels to include and exclude
# An empty array for include and exclude will include all labels
label_include = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deviates from other existing plugins, but in order to avoid adding unwanted data I think we need to default to exclude all labels. In order to make this work, initialize the exclude filter in the init() function "*":

func init() {
	inputs.Add("kubernetes", func() telegraf.Input {
		return &Kubernetes{
			LabelInclude: []string{},
			LabelExclude: []string{"*"},
		}
	})
}

Then, comment out the default values here:

# label_include = []
# label_exclude = ["*"]

If a user defines them again like so it will still include all, this is okay:

label_include = []
label_exclude = []

label_exclude = []

## Set response_timeout (default 5 seconds)
# response_timeout = "5s"

Expand Down Expand Up @@ -75,6 +86,7 @@ func (k *Kubernetes) Description() string {
}

func (k *Kubernetes) Init() error {

// If neither are provided, use the default service account.
if k.BearerToken == "" && k.BearerTokenString == "" {
k.BearerToken = defaultServiceAccountPath
Expand All @@ -88,6 +100,12 @@ func (k *Kubernetes) Init() error {
k.BearerTokenString = strings.TrimSpace(string(token))
}

labelFilter, err := filter.NewIncludeExcludeFilter(k.LabelInclude, k.LabelExclude)
if err != nil {
return err
}
k.labelFilter = labelFilter

return nil
}

Expand All @@ -107,48 +125,19 @@ func buildURL(endpoint string, base string) (*url.URL, error) {
}

func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error {
url := fmt.Sprintf("%s/stats/summary", baseURL)
var req, err = http.NewRequest("GET", url, nil)
var resp *http.Response

tlsCfg, err := k.ClientConfig.TLSConfig()
summaryMetrics := &SummaryMetrics{}
err := k.LoadJson(fmt.Sprintf("%s/stats/summary", baseURL), summaryMetrics)
if err != nil {
return err
}

if k.RoundTripper == nil {
// Set default values
if k.ResponseTimeout.Duration < time.Second {
k.ResponseTimeout.Duration = time.Second * 5
}
k.RoundTripper = &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: k.ResponseTimeout.Duration,
}
}

req.Header.Set("Authorization", "Bearer "+k.BearerTokenString)
req.Header.Add("Accept", "application/json")

resp, err = k.RoundTripper.RoundTrip(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}

summaryMetrics := &SummaryMetrics{}
err = json.NewDecoder(resp.Body).Decode(summaryMetrics)
podInfos, err := k.gatherPodInfo(baseURL)
if err != nil {
return fmt.Errorf(`Error parsing response: %s`, err)
return err
}
buildSystemContainerMetrics(summaryMetrics, acc)
buildNodeMetrics(summaryMetrics, acc)
buildPodMetrics(summaryMetrics, acc)
buildPodMetrics(baseURL, summaryMetrics, podInfos, acc)
return nil
}

Expand Down Expand Up @@ -200,7 +189,72 @@ func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator)
acc.AddFields("kubernetes_node", fields, tags)
}

func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
func (k *Kubernetes) gatherPodInfo(baseURL string) ([]PodInfo, error) {
var podapi Podlist
err := k.LoadJson(fmt.Sprintf("%s/pods", baseURL), &podapi)
if err != nil {
return nil, err
}

var podinfo []PodInfo

for i := 0; i < len(podapi.Items); i++ {
var meta PodInfo
err = json.Unmarshal(podapi.Items[i]["metadata"], &meta)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the PodInfo to the Podlist object, then you can skip this second phase of parsing. Keep all the labels at this point and we can see if they match the filter later when we add the tags.

if err != nil {
fmt.Printf(`Error parsing response: %s\n`, err)
return nil, fmt.Errorf(`Error parsing response: %s`, err)
}
for key := range meta.Labels {
if !k.labelFilter.Match(key) {
delete(meta.Labels, key)
}
}
podinfo = append(podinfo, meta)

}

return podinfo, nil

}

func (k *Kubernetes) LoadJson(url string, v interface{}) error {
var req, err = http.NewRequest("GET", url, nil)
var resp *http.Response
tlsCfg, err := k.ClientConfig.TLSConfig()
if err != nil {
return err
}
if k.RoundTripper == nil {
if k.ResponseTimeout.Duration < time.Second {
k.ResponseTimeout.Duration = time.Second * 5
}
k.RoundTripper = &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: k.ResponseTimeout.Duration,
}
}
req.Header.Set("Authorization", "Bearer "+k.BearerTokenString)
req.Header.Add("Accept", "application/json")
resp, err = k.RoundTripper.RoundTrip(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}

err = json.NewDecoder(resp.Body).Decode(v)
if err != nil {
return fmt.Errorf(`Error parsing response: %s`, err)
}

return nil
}

func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []PodInfo, acc telegraf.Accumulator) {
for _, pod := range summaryMetrics.Pods {
for _, container := range pod.Containers {
tags := map[string]string{
Expand All @@ -209,6 +263,14 @@ func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
"container_name": container.Name,
"pod_name": pod.PodRef.Name,
}
for i := range podInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the named range variables: for _, info := range podInfo {

if podInfo[i].Name == pod.PodRef.Name && podInfo[i].NameSpace == pod.PodRef.Namespace {
for k, v := range podInfo[i].Labels {
tags[k] = v
}
}
}

fields := make(map[string]interface{})
fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores
fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds
Expand Down
15 changes: 15 additions & 0 deletions plugins/inputs/kubernetes/kubernetes_podlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package kubernetes

import "encoding/json"

type Podlist struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest naming this Pods since this contains a list.

Kind string `json:"kind"`
ApiVersion string `json:"apiVersion"`
Items []map[string]json.RawMessage `json:"items"`
}

type PodInfo struct {
Name string `json:"name"`
NameSpace string `json:"namespace"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Namespace

Labels map[string]string `json:"labels"`
}
53 changes: 49 additions & 4 deletions plugins/inputs/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"fmt"
"github.com/influxdata/telegraf/filter"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -12,13 +13,23 @@ import (

func TestKubernetesStats(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, response)
if r.RequestURI == "/stats/summary" {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, responseStatsSummery)
}
if r.RequestURI == "/pods" {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, responsePods)
}

}))
defer ts.Close()

labelFilter, _ := filter.NewIncludeExcludeFilter([]string{"app", "superkey"}, nil)

k := &Kubernetes{
URL: ts.URL,
URL: ts.URL,
labelFilter: labelFilter,
}

var acc testutil.Accumulator
Expand Down Expand Up @@ -89,6 +100,8 @@ func TestKubernetesStats(t *testing.T) {
"container_name": "foocontainer",
"namespace": "foons",
"pod_name": "foopod",
"app": "foo",
"superkey": "foobar",
}
acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags)

Expand All @@ -112,6 +125,8 @@ func TestKubernetesStats(t *testing.T) {
"container_name": "stopped-container",
"namespace": "foons",
"pod_name": "stopped-pod",
"app": "foo-stop",
"superkey": "superfoo",
}
acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags)

Expand Down Expand Up @@ -143,7 +158,37 @@ func TestKubernetesStats(t *testing.T) {

}

var response = `
var responsePods = `
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {},
"items": [
{
"metadata": {
"name": "foopod",
"namespace": "foons",
"labels": {
"superkey": "foobar",
"app": "foo"
}
}
},
{
"metadata": {
"name": "stopped-pod",
"namespace": "foons",
"labels": {
"superkey": "superfoo",
"app": "foo-stop"
}
}
}
]
}
`

var responseStatsSummery = `
{
"node": {
"nodeName": "node1",
Expand Down