-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kubernetes input plugin PodLabels #6764
Changes from 3 commits
7da634d
b7be8b4
0dae34d
da1438e
43cafea
d81e986
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package kubernetes | |
import ( | ||
"encoding/json" | ||
"fmt" | ||
"github.com/influxdata/telegraf/filter" | ||
"io/ioutil" | ||
"net/http" | ||
"net/url" | ||
|
@@ -23,6 +24,11 @@ type Kubernetes struct { | |
BearerToken string `toml:"bearer_token"` | ||
BearerTokenString string `toml:"bearer_token_string"` | ||
|
||
LabelInclude []string `toml:"label_include"` | ||
LabelExclude []string `toml:"label_exclude"` | ||
|
||
labelFilter filter.Filter | ||
|
||
// HTTP Timeout specified as a string - 3s, 1m, 1h | ||
ResponseTimeout internal.Duration | ||
|
||
|
@@ -42,6 +48,11 @@ var sampleConfig = ` | |
## OR | ||
# bearer_token_string = "abc_123" | ||
|
||
# Labels to include and exclude | ||
# An empty array for include and exclude will include all labels | ||
label_include = [] | ||
label_exclude = [] | ||
|
||
## Set response_timeout (default 5 seconds) | ||
# response_timeout = "5s" | ||
|
||
|
@@ -75,6 +86,7 @@ func (k *Kubernetes) Description() string { | |
} | ||
|
||
func (k *Kubernetes) Init() error { | ||
|
||
// If neither are provided, use the default service account. | ||
if k.BearerToken == "" && k.BearerTokenString == "" { | ||
k.BearerToken = defaultServiceAccountPath | ||
|
@@ -88,6 +100,12 @@ func (k *Kubernetes) Init() error { | |
k.BearerTokenString = strings.TrimSpace(string(token)) | ||
} | ||
|
||
labelFilter, err := filter.NewIncludeExcludeFilter(k.LabelInclude, k.LabelExclude) | ||
if err != nil { | ||
return err | ||
} | ||
k.labelFilter = labelFilter | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -107,48 +125,19 @@ func buildURL(endpoint string, base string) (*url.URL, error) { | |
} | ||
|
||
func (k *Kubernetes) gatherSummary(baseURL string, acc telegraf.Accumulator) error { | ||
url := fmt.Sprintf("%s/stats/summary", baseURL) | ||
var req, err = http.NewRequest("GET", url, nil) | ||
var resp *http.Response | ||
|
||
tlsCfg, err := k.ClientConfig.TLSConfig() | ||
summaryMetrics := &SummaryMetrics{} | ||
err := k.LoadJson(fmt.Sprintf("%s/stats/summary", baseURL), summaryMetrics) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if k.RoundTripper == nil { | ||
// Set default values | ||
if k.ResponseTimeout.Duration < time.Second { | ||
k.ResponseTimeout.Duration = time.Second * 5 | ||
} | ||
k.RoundTripper = &http.Transport{ | ||
TLSHandshakeTimeout: 5 * time.Second, | ||
TLSClientConfig: tlsCfg, | ||
ResponseHeaderTimeout: k.ResponseTimeout.Duration, | ||
} | ||
} | ||
|
||
req.Header.Set("Authorization", "Bearer "+k.BearerTokenString) | ||
req.Header.Add("Accept", "application/json") | ||
|
||
resp, err = k.RoundTripper.RoundTrip(req) | ||
if err != nil { | ||
return fmt.Errorf("error making HTTP request to %s: %s", url, err) | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status) | ||
} | ||
|
||
summaryMetrics := &SummaryMetrics{} | ||
err = json.NewDecoder(resp.Body).Decode(summaryMetrics) | ||
podInfos, err := k.gatherPodInfo(baseURL) | ||
if err != nil { | ||
return fmt.Errorf(`Error parsing response: %s`, err) | ||
return err | ||
} | ||
buildSystemContainerMetrics(summaryMetrics, acc) | ||
buildNodeMetrics(summaryMetrics, acc) | ||
buildPodMetrics(summaryMetrics, acc) | ||
buildPodMetrics(baseURL, summaryMetrics, podInfos, acc) | ||
return nil | ||
} | ||
|
||
|
@@ -200,7 +189,72 @@ func buildNodeMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) | |
acc.AddFields("kubernetes_node", fields, tags) | ||
} | ||
|
||
func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) { | ||
func (k *Kubernetes) gatherPodInfo(baseURL string) ([]PodInfo, error) { | ||
var podapi Podlist | ||
err := k.LoadJson(fmt.Sprintf("%s/pods", baseURL), &podapi) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var podinfo []PodInfo | ||
|
||
for i := 0; i < len(podapi.Items); i++ { | ||
var meta PodInfo | ||
err = json.Unmarshal(podapi.Items[i]["metadata"], &meta) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add the PodInfo to the Podlist object, then you can skip this second phase of parsing. Keep all the labels at this point and we can see if they match the filter later when we add the tags. |
||
if err != nil { | ||
fmt.Printf(`Error parsing response: %s\n`, err) | ||
return nil, fmt.Errorf(`Error parsing response: %s`, err) | ||
} | ||
for key := range meta.Labels { | ||
if !k.labelFilter.Match(key) { | ||
delete(meta.Labels, key) | ||
} | ||
} | ||
podinfo = append(podinfo, meta) | ||
|
||
} | ||
|
||
return podinfo, nil | ||
|
||
} | ||
|
||
func (k *Kubernetes) LoadJson(url string, v interface{}) error { | ||
var req, err = http.NewRequest("GET", url, nil) | ||
var resp *http.Response | ||
tlsCfg, err := k.ClientConfig.TLSConfig() | ||
if err != nil { | ||
return err | ||
} | ||
if k.RoundTripper == nil { | ||
if k.ResponseTimeout.Duration < time.Second { | ||
k.ResponseTimeout.Duration = time.Second * 5 | ||
} | ||
k.RoundTripper = &http.Transport{ | ||
TLSHandshakeTimeout: 5 * time.Second, | ||
TLSClientConfig: tlsCfg, | ||
ResponseHeaderTimeout: k.ResponseTimeout.Duration, | ||
} | ||
} | ||
req.Header.Set("Authorization", "Bearer "+k.BearerTokenString) | ||
req.Header.Add("Accept", "application/json") | ||
resp, err = k.RoundTripper.RoundTrip(req) | ||
if err != nil { | ||
return fmt.Errorf("error making HTTP request to %s: %s", url, err) | ||
} | ||
defer resp.Body.Close() | ||
if resp.StatusCode != http.StatusOK { | ||
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status) | ||
} | ||
|
||
err = json.NewDecoder(resp.Body).Decode(v) | ||
if err != nil { | ||
return fmt.Errorf(`Error parsing response: %s`, err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func buildPodMetrics(baseURL string, summaryMetrics *SummaryMetrics, podInfo []PodInfo, acc telegraf.Accumulator) { | ||
for _, pod := range summaryMetrics.Pods { | ||
for _, container := range pod.Containers { | ||
tags := map[string]string{ | ||
|
@@ -209,6 +263,14 @@ func buildPodMetrics(summaryMetrics *SummaryMetrics, acc telegraf.Accumulator) { | |
"container_name": container.Name, | ||
"pod_name": pod.PodRef.Name, | ||
} | ||
for i := range podInfo { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use the named range variables: |
||
if podInfo[i].Name == pod.PodRef.Name && podInfo[i].NameSpace == pod.PodRef.Namespace { | ||
for k, v := range podInfo[i].Labels { | ||
tags[k] = v | ||
} | ||
} | ||
} | ||
|
||
fields := make(map[string]interface{}) | ||
fields["cpu_usage_nanocores"] = container.CPU.UsageNanoCores | ||
fields["cpu_usage_core_nanoseconds"] = container.CPU.UsageCoreNanoSeconds | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package kubernetes | ||
|
||
import "encoding/json" | ||
|
||
type Podlist struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest naming this |
||
Kind string `json:"kind"` | ||
ApiVersion string `json:"apiVersion"` | ||
Items []map[string]json.RawMessage `json:"items"` | ||
} | ||
|
||
type PodInfo struct { | ||
Name string `json:"name"` | ||
NameSpace string `json:"namespace"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: |
||
Labels map[string]string `json:"labels"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This deviates from other existing plugins, but in order to avoid adding unwanted data I think we need to default to exclude all labels. In order to make this work, initialize the exclude filter in the
init()
function"*"
:Then, comment out the default values here:
If a user defines them again like so it will still include all, this is okay: