Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch Input changes #6004

Merged
merged 5 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 94 additions & 54 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,32 @@ type nodeStat struct {
}

type clusterHealth struct {
ClusterName string `json:"cluster_name"`
Status string `json:"status"`
TimedOut bool `json:"timed_out"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
UnassignedShards int `json:"unassigned_shards"`
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
ClusterName string `json:"cluster_name"`
DelayedUnassignedShards int `json:"delayed_unassigned_shards"`
InitializingShards int `json:"initializing_shards"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfPendingTasks int `json:"number_of_pending_tasks"`
RelocatingShards int `json:"relocating_shards"`
Status string `json:"status"`
TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"`
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
TimedOut bool `json:"timed_out"`
UnassignedShards int `json:"unassigned_shards"`
Indices map[string]indexHealth `json:"indices"`
}

type indexHealth struct {
Status string `json:"status"`
NumberOfShards int `json:"number_of_shards"`
NumberOfReplicas int `json:"number_of_replicas"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
NumberOfReplicas int `json:"number_of_replicas"`
NumberOfShards int `json:"number_of_shards"`
RelocatingShards int `json:"relocating_shards"`
Status string `json:"status"`
UnassignedShards int `json:"unassigned_shards"`
}

Expand Down Expand Up @@ -137,9 +138,17 @@ type Elasticsearch struct {
NodeStats []string
tls.ClientConfig

client *http.Client
catMasterResponseTokens []string
isMaster bool
client *http.Client
serverInfo map[string]serverInfo
serverInfoMutex sync.Mutex
}
type serverInfo struct {
nodeID string
masterID string
}

func (i serverInfo) isMaster() bool {
return i.nodeID == i.masterID
}

// NewElasticsearch return a new instance of Elasticsearch
Expand Down Expand Up @@ -186,25 +195,44 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
e.client = client
}

var wg sync.WaitGroup
wg.Add(len(e.Servers))
if e.ClusterStats {
e.serverInfo = make(map[string]serverInfo)
for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
info := serverInfo{}

for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
defer wg.Done()
url := e.nodeStatsUrl(s)
e.isMaster = false
var err error

// Gather node ID
if info.nodeID, err = e.gatherNodeID(s + "/_nodes/_local/name"); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}

if e.ClusterStats {
// get cat/master information here so NodeStats can determine
// whether this node is the Master
if err := e.setCatMaster(s + "/_cat/master"); err != nil {
if info.masterID, err = e.getCatMaster(s + "/_cat/master"); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}
}

// Always gather node states
e.serverInfoMutex.Lock()
e.serverInfo[s] = info
e.serverInfoMutex.Unlock()
dupondje marked this conversation as resolved.
Show resolved Hide resolved

}(serv, acc)
}
}

var wg sync.WaitGroup
wg.Add(len(e.Servers))

for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
defer wg.Done()
url := e.nodeStatsUrl(s)

// Always gather node stats
if err := e.gatherNodeStats(url, acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
Expand All @@ -221,7 +249,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
}

if e.ClusterStats && (e.isMaster || !e.ClusterStatsOnlyFromMaster || !e.Local) {
if e.ClusterStats && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) {
dupondje marked this conversation as resolved.
Show resolved Hide resolved
if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
Expand Down Expand Up @@ -267,6 +295,22 @@ func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string {
return fmt.Sprintf("%s/%s", url, strings.Join(e.NodeStats, ","))
}

func (e *Elasticsearch) gatherNodeID(url string) (string, error) {
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]*nodeStat `json:"nodes"`
}{}
if err := e.gatherJsonData(url, nodeStats); err != nil {
return "", err
}

// Only 1 should be returned
for id := range nodeStats.Nodes {
return id, nil
}
return "", nil
}

func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Expand All @@ -284,11 +328,6 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
"cluster_name": nodeStats.ClusterName,
}

if e.ClusterStats {
// check for master
e.isMaster = (id == e.catMasterResponseTokens[0])
}

for k, v := range n.Attributes {
tags["node_attribute_"+k] = v
}
Expand Down Expand Up @@ -331,20 +370,21 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator
}
measurementTime := time.Now()
clusterFields := map[string]interface{}{
"status": healthStats.Status,
"status_code": mapHealthStatusToCode(healthStats.Status),
"timed_out": healthStats.TimedOut,
"number_of_nodes": healthStats.NumberOfNodes,
"number_of_data_nodes": healthStats.NumberOfDataNodes,
"active_primary_shards": healthStats.ActivePrimaryShards,
"active_shards": healthStats.ActiveShards,
"relocating_shards": healthStats.RelocatingShards,
"initializing_shards": healthStats.InitializingShards,
"unassigned_shards": healthStats.UnassignedShards,
"active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
"delayed_unassigned_shards": healthStats.DelayedUnassignedShards,
"initializing_shards": healthStats.InitializingShards,
"number_of_data_nodes": healthStats.NumberOfDataNodes,
"number_of_in_flight_fetch": healthStats.NumberOfInFlightFetch,
"number_of_nodes": healthStats.NumberOfNodes,
"number_of_pending_tasks": healthStats.NumberOfPendingTasks,
"relocating_shards": healthStats.RelocatingShards,
"status": healthStats.Status,
"status_code": mapHealthStatusToCode(healthStats.Status),
"task_max_waiting_in_queue_millis": healthStats.TaskMaxWaitingInQueueMillis,
"active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
"timed_out": healthStats.TimedOut,
"unassigned_shards": healthStats.UnassignedShards,
}
acc.AddFields(
"elasticsearch_cluster_health",
Expand All @@ -355,18 +395,18 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator

for name, health := range healthStats.Indices {
indexFields := map[string]interface{}{
"status": health.Status,
"status_code": mapHealthStatusToCode(health.Status),
"number_of_shards": health.NumberOfShards,
"number_of_replicas": health.NumberOfReplicas,
"active_primary_shards": health.ActivePrimaryShards,
"active_shards": health.ActiveShards,
"relocating_shards": health.RelocatingShards,
"initializing_shards": health.InitializingShards,
"number_of_replicas": health.NumberOfReplicas,
"number_of_shards": health.NumberOfShards,
"relocating_shards": health.RelocatingShards,
"status": health.Status,
"status_code": mapHealthStatusToCode(health.Status),
"unassigned_shards": health.UnassignedShards,
}
acc.AddFields(
"elasticsearch_indices",
"elasticsearch_cluster_health_indices",
indexFields,
map[string]string{"index": name, "name": healthStats.ClusterName},
measurementTime,
Expand Down Expand Up @@ -405,27 +445,27 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator)
return nil
}

func (e *Elasticsearch) setCatMaster(url string) error {
func (e *Elasticsearch) getCatMaster(url string) (string, error) {
r, err := e.client.Get(url)
if err != nil {
return err
return "", err
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
// NOTE: we are not going to read/discard r.Body under the assumption we'd prefer
// to let the underlying transport close the connection and re-establish a new one for
// future calls.
return fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
return "", fmt.Errorf("elasticsearch: Unable to retrieve master node information. API responded with status-code %d, expected %d", r.StatusCode, http.StatusOK)
}
response, err := ioutil.ReadAll(r.Body)

if err != nil {
return err
return "", err
}

e.catMasterResponseTokens = strings.Split(string(response), " ")
masterID := strings.Split(string(response), " ")[0]

return nil
return masterID, nil
}

func (e *Elasticsearch) gatherJsonData(url string, v interface{}) error {
Expand Down
Loading