Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes input plugin PodLabels #6764

Merged
merged 6 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion plugins/inputs/kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Kubernetes Input Plugin

This input plugin talks to the kubelet api using the `/stats/summary` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet.
This input plugin talks to the kubelet api using the `/stats/summary` and `/pods` endpoint to gather metrics about the running pods and containers for a single host. It is assumed that this plugin is running as part of a `daemonset` within a kubernetes installation. This means that telegraf is running on every node within the cluster. Therefore, you should configure this plugin to talk to its locally running kubelet.

To find the ip address of the host you are running on you can issue a command like the following:

Expand Down Expand Up @@ -44,6 +44,11 @@ avoid cardinality issues:
## OR
# bearer_token_string = "abc_123"

# Labels to include and exclude
# An empty array for include and exclude will include all labels
# label_include = []
# label_exclude = ["*"]

## Set response_timeout (default 5 seconds)
# response_timeout = "5s"

Expand Down
123 changes: 87 additions & 36 deletions plugins/inputs/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"encoding/json"
"fmt"
"github.com/influxdata/telegraf/filter"
"io/ioutil"
"net/http"
"net/url"
Expand All @@ -23,6 +24,11 @@ type Kubernetes struct {
BearerToken string `toml:"bearer_token"`
BearerTokenString string `toml:"bearer_token_string"`

LabelInclude []string `toml:"label_include"`
LabelExclude []string `toml:"label_exclude"`

labelFilter filter.Filter

// HTTP Timeout specified as a string - 3s, 1m, 1h
ResponseTimeout internal.Duration

Expand All @@ -42,6 +48,11 @@ var sampleConfig = `
## OR
# bearer_token_string = "abc_123"

# Labels to include and exclude
# An empty array for include and exclude will include all labels
# label_include = []
# label_exclude = ["*"]

## Set response_timeout (default 5 seconds)
# response_timeout = "5s"

Expand All @@ -60,7 +71,10 @@ const (

func init() {
inputs.Add("kubernetes", func() telegraf.Input {
return &Kubernetes{}
return &Kubernetes{
LabelInclude: []string{},
LabelExclude: []string{"*"},
}
})
}

Expand All @@ -75,6 +89,7 @@ func (k *Kubernetes) Description() string {
}

func (k *Kubernetes) Init() error {

// If neither are provided, use the default service account.
if k.BearerToken == "" && k.BearerTokenString == "" {
k.BearerToken = defaultServiceAccountPath
Expand All @@ -88,6 +103,12 @@ func (k *Kubernetes) Init() error {
k.BearerTokenString = strings.TrimSpace(string(token))
}

labelFilter, err := filter.NewIncludeExcludeFilter(k.LabelInclude, k.LabelExclude)
if err != nil {
return err
}
k.labelFilter = labelFilter

return nil
}

Expand All @@ -107,48 +128,19 @@ func buildURL(endpoint string, base string) (*url.URL, error) {
}

func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error {
url := fmt.Sprintf("%s/stats/summary", baseURL)
var req, err = http.NewRequest("GET", url, nil)
var resp *http.Response

tlsCfg, err := k.ClientConfig.TLSConfig()
summaryMetrics := &SummaryMetrics{}
err := k.LoadJson(fmt.Sprintf("%s/stats/summary", baseURL), summaryMetrics)
if err != nil {
return err
}

if k.RoundTripper == nil {
// Set default values
if k.ResponseTimeout.Duration < time.Second {
k.ResponseTimeout.Duration = time.Second * 5
}
k.RoundTripper = &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: k.ResponseTimeout.Duration,
}
}

req.Header.Set("Authorization", "Bearer "+k.BearerTokenString)
req.Header.Add("Accept", "application/json")

resp, err = k.RoundTripper.RoundTrip(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}

summaryMetrics := &SummaryMetrics{}
err = json.NewDecoder(resp.Body).Decode(summaryMetrics)
podInfos, err := k.gatherPodInfo(baseURL)
if err != nil {
return fmt.Errorf(`Error parsing response: %s`, err)
return err
}
buildSystemContainerMetrics(summaryMetrics, acc)
buildNodeMetrics(summaryMetrics, acc)
buildPodMetrics(summaryMetrics, acc)
buildPodMetrics(baseURL, summaryMetrics, podInfos, k.labelFilter, acc)
return nil
}

Expand Down Expand Up @@ -200,7 +192,56 @@ func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator)
acc.AddFields("kubernetes_node", fields, tags)
}

func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
func (k *Kubernetes) gatherPodInfo(baseURL string) ([]Metadata, error) {
var podApi Pods
err := k.LoadJson(fmt.Sprintf("%s/pods", baseURL), &podApi)
if err != nil {
return nil, err
}
var podInfos []Metadata
for _, podMetadata := range podApi.Items {
podInfos = append(podInfos, podMetadata.Metadata)
}
return podInfos, nil
}

func (k *Kubernetes) LoadJson(url string, v interface{}) error {
var req, err = http.NewRequest("GET", url, nil)
var resp *http.Response
tlsCfg, err := k.ClientConfig.TLSConfig()
if err != nil {
return err
}
if k.RoundTripper == nil {
if k.ResponseTimeout.Duration < time.Second {
k.ResponseTimeout.Duration = time.Second * 5
}
k.RoundTripper = &http.Transport{
TLSHandshakeTimeout: 5 * time.Second,
TLSClientConfig: tlsCfg,
ResponseHeaderTimeout: k.ResponseTimeout.Duration,
}
}
req.Header.Set("Authorization", "Bearer "+k.BearerTokenString)
req.Header.Add("Accept", "application/json")
resp, err = k.RoundTripper.RoundTrip(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
}

err = json.NewDecoder(resp.Body).Decode(v)
if err != nil {
return fmt.Errorf(`Error parsing response: %s`, err)
}

return nil
}

func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []Metadata, labelFilter filter.Filter, acc telegraf.Accumulator) {
for _, pod := range summaryMetrics.Pods {
for _, container := range pod.Containers {
tags := map[string]string{
Expand All @@ -209,6 +250,16 @@ func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) {
"container_name": container.Name,
"pod_name": pod.PodRef.Name,
}
for _, info := range podInfo {
if info.Name == pod.PodRef.Name && info.Namespace == pod.PodRef.Namespace {
for k, v := range info.Labels {
if labelFilter.Match(k) {
tags[k] = v
}
}
}
}

fields := make(map[string]interface{})
fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores
fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds
Expand Down
17 changes: 17 additions & 0 deletions plugins/inputs/kubernetes/kubernetes_pods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kubernetes

type Pods struct {
Kind string `json:"kind"`
ApiVersion string `json:"apiVersion"`
Items []Item `json:"items"`
}

type Item struct {
Metadata Metadata `json:"metadata"`
}

type Metadata struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Labels map[string]string `json:"labels"`
}
55 changes: 51 additions & 4 deletions plugins/inputs/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"fmt"
"github.com/influxdata/telegraf/filter"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -12,13 +13,23 @@ import (

func TestKubernetesStats(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, response)
if r.RequestURI == "/stats/summary" {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, responseStatsSummery)
}
if r.RequestURI == "/pods" {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, responsePods)
}

}))
defer ts.Close()

labelFilter, _ := filter.NewIncludeExcludeFilter([]string{"app", "superkey"}, nil)

k := &Kubernetes{
URL: ts.URL,
URL: ts.URL,
labelFilter: labelFilter,
}

var acc testutil.Accumulator
Expand Down Expand Up @@ -89,6 +100,8 @@ func TestKubernetesStats(t *testing.T) {
"container_name": "foocontainer",
"namespace": "foons",
"pod_name": "foopod",
"app": "foo",
"superkey": "foobar",
}
acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags)

Expand All @@ -112,6 +125,8 @@ func TestKubernetesStats(t *testing.T) {
"container_name": "stopped-container",
"namespace": "foons",
"pod_name": "stopped-pod",
"app": "foo-stop",
"superkey": "superfoo",
}
acc.AssertContainsTaggedFields(t, "kubernetes_pod_container", fields, tags)

Expand Down Expand Up @@ -143,7 +158,39 @@ func TestKubernetesStats(t *testing.T) {

}

var response = `
var responsePods = `
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {},
"items": [
{
"metadata": {
"name": "foopod",
"namespace": "foons",
"labels": {
"superkey": "foobar",
"app": "foo",
"exclude": "exclude0"
}
}
},
{
"metadata": {
"name": "stopped-pod",
"namespace": "foons",
"labels": {
"superkey": "superfoo",
"app": "foo-stop",
"exclude": "exclude1"
}
}
}
]
}
`

var responseStatsSummery = `
{
"node": {
"nodeName": "node1",
Expand Down