Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch Input changes #6004

Merged
merged 5 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 52 additions & 22 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,16 @@ type Elasticsearch struct {
NodeStats []string
tls.ClientConfig

client *http.Client
catMasterResponseTokens []string
isMaster bool
client *http.Client
serverInfo map[string]serverInfo
}
type serverInfo struct {
nodeID string
masterID string
}

func (i serverInfo) isMaster() bool {
return i.nodeID == i.masterID
}

// NewElasticsearch return a new instance of Elasticsearch
Expand Down Expand Up @@ -186,25 +193,37 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
e.client = client
}

var wg sync.WaitGroup
wg.Add(len(e.Servers))
if e.ClusterStats {
e.serverInfo = make(map[string]serverInfo)
for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
info := serverInfo{}

for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
defer wg.Done()
url := e.nodeStatsUrl(s)
e.isMaster = false
// Gather node ID
if err := e.gatherNodeID(&info, s+"/_nodes/_local/name"); err != nil {
dupondje marked this conversation as resolved.
Show resolved Hide resolved
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}

if e.ClusterStats {
// get cat/master information here so NodeStats can determine
// whether this node is the Master
if err := e.setCatMaster(s + "/_cat/master"); err != nil {
if err := e.setCatMaster(&info, s+"/_cat/master"); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}
}
}(serv, acc)
}
}

// Always gather node states
var wg sync.WaitGroup
wg.Add(len(e.Servers))

for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
defer wg.Done()
url := e.nodeStatsUrl(s)

// Always gather node stats
if err := e.gatherNodeStats(url, acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
Expand All @@ -221,7 +240,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
}

if e.ClusterStats && (e.isMaster || !e.ClusterStatsOnlyFromMaster || !e.Local) {
if e.ClusterStats && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) {
dupondje marked this conversation as resolved.
Show resolved Hide resolved
if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
Expand Down Expand Up @@ -267,6 +286,22 @@ func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string {
return fmt.Sprintf("%s/%s", url, strings.Join(e.NodeStats, ","))
}

func (e *Elasticsearch) gatherNodeID(info *serverInfo, url string) error {
dupondje marked this conversation as resolved.
Show resolved Hide resolved
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]*nodeStat `json:"nodes"`
}{}
if err := e.gatherJsonData(url, nodeStats); err != nil {
return err
}

// Only 1 should be returned
for id := range nodeStats.Nodes {
info.nodeID = id
}
return nil
}

func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Expand All @@ -284,11 +319,6 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
"cluster_name": nodeStats.ClusterName,
}

if e.ClusterStats {
// check for master
e.isMaster = (id == e.catMasterResponseTokens[0])
}

for k, v := range n.Attributes {
tags["node_attribute_"+k] = v
}
Expand Down Expand Up @@ -405,7 +435,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator)
return nil
}

func (e *Elasticsearch) setCatMaster(url string) error {
func (e *Elasticsearch) setCatMaster(info *serverInfo, url string) error {
r, err := e.client.Get(url)
if err != nil {
return err
Expand All @@ -423,7 +453,7 @@ func (e *Elasticsearch) setCatMaster(url string) error {
return err
}

e.catMasterResponseTokens = strings.Split(string(response), " ")
info.masterID = strings.Split(string(response), " ")[0]

return nil
}
Expand Down
52 changes: 38 additions & 14 deletions plugins/inputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/influxdata/telegraf/testutil"

"fmt"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -22,6 +23,9 @@ func defaultTags() map[string]string {
"node_host": "test",
}
}
func defaultServerInfo() serverInfo {
return serverInfo{nodeID: "", masterID: "SDFsfSDFsdfFSDSDfSFDSDF"}
}

type transportMock struct {
statusCode int
Expand Down Expand Up @@ -49,8 +53,8 @@ func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
func (t *transportMock) CancelRequest(_ *http.Request) {
}

func checkIsMaster(es *Elasticsearch, expected bool, t *testing.T) {
if es.isMaster != expected {
func checkIsMaster(es *Elasticsearch, server string, expected bool, t *testing.T) {
if es.serverInfo[server].isMaster() != expected {
msg := fmt.Sprintf("IsMaster set incorrectly")
assert.Fail(t, msg)
}
Expand All @@ -73,13 +77,15 @@ func TestGather(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()

var acc testutil.Accumulator
if err := acc.GatherError(es.Gather); err != nil {
t.Fatal(err)
}

checkIsMaster(es, false, t)
checkIsMaster(es, es.Servers[0], false, t)
checkNodeStatsResult(t, &acc)
}

Expand All @@ -88,13 +94,15 @@ func TestGatherIndividualStats(t *testing.T) {
es.Servers = []string{"http://example.com:9200"}
es.NodeStats = []string{"jvm", "process"}
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponseJVMProcess)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()

var acc testutil.Accumulator
if err := acc.GatherError(es.Gather); err != nil {
t.Fatal(err)
}

checkIsMaster(es, false, t)
checkIsMaster(es, es.Servers[0], false, t)

tags := defaultTags()
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
Expand All @@ -112,13 +120,15 @@ func TestGatherNodeStats(t *testing.T) {
es := newElasticsearchWithClient()
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(http.StatusOK, nodeStatsResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()

var acc testutil.Accumulator
if err := es.gatherNodeStats("junk", &acc); err != nil {
t.Fatal(err)
}

checkIsMaster(es, false, t)
checkIsMaster(es, es.Servers[0], false, t)
checkNodeStatsResult(t, &acc)
}

Expand All @@ -128,11 +138,13 @@ func TestGatherClusterHealthEmptyClusterHealth(t *testing.T) {
es.ClusterHealth = true
es.ClusterHealthLevel = ""
es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()

var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))

checkIsMaster(es, false, t)
checkIsMaster(es, es.Servers[0], false, t)

acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
Expand All @@ -153,11 +165,13 @@ func TestGatherClusterHealthSpecificClusterHealth(t *testing.T) {
es.ClusterHealth = true
es.ClusterHealthLevel = "cluster"
es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()

var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))

checkIsMaster(es, false, t)
checkIsMaster(es, es.Servers[0], false, t)

acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
Expand All @@ -178,11 +192,13 @@ func TestGatherClusterHealthAlsoIndicesHealth(t *testing.T) {
es.ClusterHealth = true
es.ClusterHealthLevel = "indices"
es.client.Transport = newTransportMock(http.StatusOK, clusterHealthResponseWithIndices)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()

var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))

checkIsMaster(es, false, t)
checkIsMaster(es, es.Servers[0], false, t)

acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
Expand All @@ -202,13 +218,17 @@ func TestGatherClusterStatsMaster(t *testing.T) {
es := newElasticsearchWithClient()
es.ClusterStats = true
es.Servers = []string{"http://example.com:9200"}
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""}

// first get catMaster
es.client.Transport = newTransportMock(http.StatusOK, IsMasterResult)
require.NoError(t, es.setCatMaster("junk"))
info := es.serverInfo["http://example.com:9200"]
require.NoError(t, es.setCatMaster(&info, "junk"))
es.serverInfo["http://example.com:9200"] = info

IsMasterResultTokens := strings.Split(string(IsMasterResult), " ")
if es.catMasterResponseTokens[0] != IsMasterResultTokens[0] {
if es.serverInfo["http://example.com:9200"].masterID != IsMasterResultTokens[0] {
msg := fmt.Sprintf("catmaster is incorrect")
assert.Fail(t, msg)
}
Expand All @@ -221,7 +241,7 @@ func TestGatherClusterStatsMaster(t *testing.T) {
t.Fatal(err)
}

checkIsMaster(es, true, t)
checkIsMaster(es, es.Servers[0], true, t)
checkNodeStatsResult(t, &acc)

// now test the clusterstats method
Expand All @@ -243,13 +263,17 @@ func TestGatherClusterStatsNonMaster(t *testing.T) {
es := newElasticsearchWithClient()
es.ClusterStats = true
es.Servers = []string{"http://example.com:9200"}
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = serverInfo{nodeID: "SDFsfSDFsdfFSDSDfSFDSDF", masterID: ""}

// first get catMaster
es.client.Transport = newTransportMock(http.StatusOK, IsNotMasterResult)
require.NoError(t, es.setCatMaster("junk"))
info := es.serverInfo["http://example.com:9200"]
require.NoError(t, es.setCatMaster(&info, "junk"))
es.serverInfo["http://example.com:9200"] = info

IsNotMasterResultTokens := strings.Split(string(IsNotMasterResult), " ")
if es.catMasterResponseTokens[0] != IsNotMasterResultTokens[0] {
if es.serverInfo["http://example.com:9200"].masterID != IsNotMasterResultTokens[0] {
msg := fmt.Sprintf("catmaster is incorrect")
assert.Fail(t, msg)
}
Expand All @@ -263,7 +287,7 @@ func TestGatherClusterStatsNonMaster(t *testing.T) {
}

// ensure flag is clear so Cluster Stats would not be done
checkIsMaster(es, false, t)
checkIsMaster(es, es.Servers[0], false, t)
checkNodeStatsResult(t, &acc)
}

Expand Down
Loading