Skip to content

Commit

Permalink
Fix master check and move cluster health indices to separate measurem…
Browse files Browse the repository at this point in the history
…ent (#6004)
  • Loading branch information
dupondje authored and danielnelson committed Jun 25, 2019
1 parent 26ee42c commit 83c8d7b
Show file tree
Hide file tree
Showing 3 changed files with 556 additions and 74 deletions.
153 changes: 99 additions & 54 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,32 @@ type nodeStat struct {
}

type clusterHealth struct {
ClusterName string `json:"cluster_name"`
Status string `json:"status"`
TimedOut bool `json:"timed_out"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
UnassignedShards int `json:"unassigned_shards"`
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
ClusterName string `json:"cluster_name"`
DelayedUnassignedShards int `json:"delayed_unassigned_shards"`
InitializingShards int `json:"initializing_shards"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfPendingTasks int `json:"number_of_pending_tasks"`
RelocatingShards int `json:"relocating_shards"`
Status string `json:"status"`
TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"`
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
TimedOut bool `json:"timed_out"`
UnassignedShards int `json:"unassigned_shards"`
Indices map[string]indexHealth `json:"indices"`
}

type indexHealth struct {
Status string `json:"status"`
NumberOfShards int `json:"number_of_shards"`
NumberOfReplicas int `json:"number_of_replicas"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
NumberOfReplicas int `json:"number_of_replicas"`
NumberOfShards int `json:"number_of_shards"`
RelocatingShards int `json:"relocating_shards"`
Status string `json:"status"`
UnassignedShards int `json:"unassigned_shards"`
}

Expand Down Expand Up @@ -137,9 +138,17 @@ type Elasticsearch struct {
NodeStats []string
tls.ClientConfig

client *http.Client
catMasterResponseTokens []string
isMaster bool
client *http.Client
serverInfo map[string]serverInfo
serverInfoMutex sync.Mutex
}
type serverInfo struct {
nodeID string
masterID string
}

func (i serverInfo) isMaster() bool {
return i.nodeID == i.masterID
}

// NewElasticsearch return a new instance of Elasticsearch
Expand Down Expand Up @@ -186,25 +195,49 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
e.client = client
}

var wg sync.WaitGroup
wg.Add(len(e.Servers))
if e.ClusterStats {
var wgC sync.WaitGroup
wgC.Add(len(e.Servers))

for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
defer wg.Done()
url := e.nodeStatsUrl(s)
e.isMaster = false
e.serverInfo = make(map[string]serverInfo)
for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
defer wgC.Done()
info := serverInfo{}

var err error

// Gather node ID
if info.nodeID, err = e.gatherNodeID(s + "/_nodes/_local/name"); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}

if e.ClusterStats {
// get cat/master information here so NodeStats can determine
// whether this node is the Master
if err := e.setCatMaster(s + "/_cat/master"); err != nil {
if info.masterID, err = e.getCatMaster(s + "/_cat/master"); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}
}

// Always gather node states
e.serverInfoMutex.Lock()
e.serverInfo[s] = info
e.serverInfoMutex.Unlock()

}(serv, acc)
}
wgC.Wait()
}

var wg sync.WaitGroup
wg.Add(len(e.Servers))

for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
defer wg.Done()
url := e.nodeStatsUrl(s)

// Always gather node stats
if err := e.gatherNodeStats(url, acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
Expand All @@ -221,7 +254,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
}

if e.ClusterStats && (e.isMaster || !e.ClusterStatsOnlyFromMaster || !e.Local) {
if e.ClusterStats && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) {
if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
Expand Down Expand Up @@ -267,6 +300,22 @@ func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string {
return fmt.Sprintf("%s/%s", url, strings.Join(e.NodeStats, ","))
}

func (e *Elasticsearch) gatherNodeID(url string) (string, error) {
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]*nodeStat `json:"nodes"`
}{}
if err := e.gatherJsonData(url, nodeStats); err != nil {
return "", err
}

// Only 1 should be returned
for id := range nodeStats.Nodes {
return id, nil
}
return "", nil
}

func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Expand All @@ -284,11 +333,6 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
"cluster_name": nodeStats.ClusterName,
}

if e.ClusterStats {
// check for master
e.isMaster = (id == e.catMasterResponseTokens[0])
}

for k, v := range n.Attributes {
tags["node_attribute_"+k] = v
}
Expand Down Expand Up @@ -331,20 +375,21 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator
}
measurementTime := time.Now()
clusterFields := map[string]interface{}{
"status": healthStats.Status,
"status_code": mapHealthStatusToCode(healthStats.Status),
"timed_out": healthStats.TimedOut,
"number_of_nodes": healthStats.NumberOfNodes,
"number_of_data_nodes": healthStats.NumberOfDataNodes,
"active_primary_shards": healthStats.ActivePrimaryShards,
"active_shards": healthStats.ActiveShards,
"relocating_shards": healthStats.RelocatingShards,
"initializing_shards": healthStats.InitializingShards,
"unassigned_shards": healthStats.UnassignedShards,
"active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
"delayed_unassigned_shards": healthStats.DelayedUnassignedShards,
"initializing_shards": healthStats.InitializingShards,
"number_of_data_nodes": healthStats.NumberOfDataNodes,
"number_of_in_flight_fetch": healthStats.NumberOfInFlightFetch,
"number_of_nodes": healthStats.NumberOfNodes,
"number_of_pending_tasks": healthStats.NumberOfPendingTasks,
"relocating_shards": healthStats.RelocatingShards,
"status": healthStats.Status,
"status_code": mapHealthStatusToCode(healthStats.Status),
"task_max_waiting_in_queue_millis": healthStats.TaskMaxWaitingInQueueMillis,
"active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
"timed_out": healthStats.TimedOut,
"unassigned_shards": healthStats.UnassignedShards,
}
acc.AddFields(
"elasticsearch_cluster_health",
Expand All @@ -355,18 +400,18 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator

for name, health := range healthStats.Indices {
indexFields := map[string]interface{}{
"status": health.Status,
"status_code": mapHealthStatusToCode(health.Status),
"number_of_shards": health.NumberOfShards,
"number_of_replicas": health.NumberOfReplicas,
"active_primary_shards": health.ActivePrimaryShards,
"active_shards": health.ActiveShards,
"relocating_shards": health.RelocatingShards,
"initializing_shards": health.InitializingShards,
"number_of_replicas": health.NumberOfReplicas,
"number_of_shards": health.NumberOfShards,
"relocating_shards": health.RelocatingShards,
"status": health.Status,
"status_code": mapHealthStatusToCode(health.Status),
"unassigned_shards": health.UnassignedShards,
}
acc.AddFields(
"elasticsearch_indices",
"elasticsearch_cluster_health_indices",
indexFields,
map[string]string{"index": name, "name": healthStats.ClusterName},
measurementTime,
Expand Down Expand Up @@ -405,27 +450,27 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator)
return nil
}

func (e *Elasticsearch) setCatMaster(url string) error {
func (e *Elasticsearch) getCatMaster(url string) (string, error) {
r, err := e.client.Get(url)
if err != nil {
return err
return "", err
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
// to let the underlying transport close the connection and re-establish a new one for
// future calls.
return fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
return "", fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
}
response, err := ioutil.ReadAll(r.Body)

if err != nil {
return err
return "", err
}

e.catMasterResponseTokens = strings.Split(string(response), " ")
masterID := strings.Split(string(response), " ")[0]

return nil
return masterID, nil
}

func (e *Elasticsearch) gatherJsonData(url string, v interface{}) error {
Expand Down
Loading

0 comments on commit 83c8d7b

Please sign in to comment.