Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch Input changes #6004

Merged
merged 5 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 81 additions & 49 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,32 @@ type nodeStat struct {
}

type clusterHealth struct {
ClusterName string `json:"cluster_name"`
Status string `json:"status"`
TimedOut bool `json:"timed_out"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
UnassignedShards int `json:"unassigned_shards"`
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
ClusterName string `json:"cluster_name"`
DelayedUnassignedShards int `json:"delayed_unassigned_shards"`
InitializingShards int `json:"initializing_shards"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfPendingTasks int `json:"number_of_pending_tasks"`
RelocatingShards int `json:"relocating_shards"`
Status string `json:"status"`
TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"`
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
TimedOut bool `json:"timed_out"`
UnassignedShards int `json:"unassigned_shards"`
Indices map[string]indexHealth `json:"indices"`
}

type indexHealth struct {
Status string `json:"status"`
NumberOfShards int `json:"number_of_shards"`
NumberOfReplicas int `json:"number_of_replicas"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
NumberOfReplicas int `json:"number_of_replicas"`
NumberOfShards int `json:"number_of_shards"`
RelocatingShards int `json:"relocating_shards"`
Status string `json:"status"`
UnassignedShards int `json:"unassigned_shards"`
}

Expand Down Expand Up @@ -137,9 +138,16 @@ type Elasticsearch struct {
NodeStats []string
tls.ClientConfig

client *http.Client
catMasterResponseTokens []string
isMaster bool
client *http.Client
serverInfo map[string]serverInfo
}
type serverInfo struct {
nodeID string
masterID string
}

func (i serverInfo) isMaster() bool {
return i.nodeID == i.masterID
}

// NewElasticsearch return a new instance of Elasticsearch
Expand Down Expand Up @@ -186,25 +194,37 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
e.client = client
}

var wg sync.WaitGroup
wg.Add(len(e.Servers))
if e.ClusterStats {
e.serverInfo = make(map[string]serverInfo)
for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
info := serverInfo{}

for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
defer wg.Done()
url := e.nodeStatsUrl(s)
e.isMaster = false
// Gather node ID
if err := e.gatherNodeID(&info, s+"/_nodes/_local/name"); err != nil {
dupondje marked this conversation as resolved.
Show resolved Hide resolved
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}

if e.ClusterStats {
// get cat/master information here so NodeStats can determine
// whether this node is the Master
if err := e.setCatMaster(s + "/_cat/master"); err != nil {
if err := e.setCatMaster(&info, s+"/_cat/master"); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
}
}
}(serv, acc)
}
}

// Always gather node states
var wg sync.WaitGroup
wg.Add(len(e.Servers))

for _, serv := range e.Servers {
go func(s string, acc telegraf.Accumulator) {
defer wg.Done()
url := e.nodeStatsUrl(s)

// Always gather node stats
if err := e.gatherNodeStats(url, acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
Expand All @@ -221,7 +241,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}
}

if e.ClusterStats && (e.isMaster || !e.ClusterStatsOnlyFromMaster || !e.Local) {
if e.ClusterStats && (e.serverInfo[s].isMaster() || !e.ClusterStatsOnlyFromMaster || !e.Local) {
dupondje marked this conversation as resolved.
Show resolved Hide resolved
if err := e.gatherClusterStats(s+"/_cluster/stats", acc); err != nil {
acc.AddError(fmt.Errorf(mask.ReplaceAllString(err.Error(), "http(s)://XXX:XXX@")))
return
Expand Down Expand Up @@ -267,6 +287,22 @@ func (e *Elasticsearch) nodeStatsUrl(baseUrl string) string {
return fmt.Sprintf("%s/%s", url, strings.Join(e.NodeStats, ","))
}

func (e *Elasticsearch) gatherNodeID(info *serverInfo, url string) error {
dupondje marked this conversation as resolved.
Show resolved Hide resolved
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]*nodeStat `json:"nodes"`
}{}
if err := e.gatherJsonData(url, nodeStats); err != nil {
return err
}

// Only 1 should be returned
for id := range nodeStats.Nodes {
info.nodeID = id
}
return nil
}

func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
nodeStats := &struct {
ClusterName string `json:"cluster_name"`
Expand All @@ -284,11 +320,6 @@ func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) er
"cluster_name": nodeStats.ClusterName,
}

if e.ClusterStats {
// check for master
e.isMaster = (id == e.catMasterResponseTokens[0])
}

for k, v := range n.Attributes {
tags["node_attribute_"+k] = v
}
Expand Down Expand Up @@ -331,20 +362,21 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator
}
measurementTime := time.Now()
clusterFields := map[string]interface{}{
"status": healthStats.Status,
"status_code": mapHealthStatusToCode(healthStats.Status),
"timed_out": healthStats.TimedOut,
"number_of_nodes": healthStats.NumberOfNodes,
"number_of_data_nodes": healthStats.NumberOfDataNodes,
"active_primary_shards": healthStats.ActivePrimaryShards,
"active_shards": healthStats.ActiveShards,
"relocating_shards": healthStats.RelocatingShards,
"initializing_shards": healthStats.InitializingShards,
"unassigned_shards": healthStats.UnassignedShards,
"active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
"delayed_unassigned_shards": healthStats.DelayedUnassignedShards,
"initializing_shards": healthStats.InitializingShards,
"number_of_data_nodes": healthStats.NumberOfDataNodes,
"number_of_in_flight_fetch": healthStats.NumberOfInFlightFetch,
"number_of_nodes": healthStats.NumberOfNodes,
"number_of_pending_tasks": healthStats.NumberOfPendingTasks,
"relocating_shards": healthStats.RelocatingShards,
"status": healthStats.Status,
"status_code": mapHealthStatusToCode(healthStats.Status),
"task_max_waiting_in_queue_millis": healthStats.TaskMaxWaitingInQueueMillis,
"active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
"timed_out": healthStats.TimedOut,
"unassigned_shards": healthStats.UnassignedShards,
}
acc.AddFields(
"elasticsearch_cluster_health",
Expand All @@ -355,14 +387,14 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator

for name, health := range healthStats.Indices {
indexFields := map[string]interface{}{
"status": health.Status,
"status_code": mapHealthStatusToCode(health.Status),
"number_of_shards": health.NumberOfShards,
"number_of_replicas": health.NumberOfReplicas,
"active_primary_shards": health.ActivePrimaryShards,
"active_shards": health.ActiveShards,
"relocating_shards": health.RelocatingShards,
"initializing_shards": health.InitializingShards,
"number_of_replicas": health.NumberOfReplicas,
"number_of_shards": health.NumberOfShards,
"relocating_shards": health.RelocatingShards,
"status": health.Status,
"status_code": mapHealthStatusToCode(health.Status),
"unassigned_shards": health.UnassignedShards,
}
acc.AddFields(
Expand Down Expand Up @@ -405,7 +437,7 @@ func (e *Elasticsearch) gatherClusterStats(url string, acc telegraf.Accumulator)
return nil
}

func (e *Elasticsearch) setCatMaster(url string) error {
func (e *Elasticsearch) setCatMaster(info *serverInfo, url string) error {
r, err := e.client.Get(url)
if err != nil {
return err
Expand All @@ -423,7 +455,7 @@ func (e *Elasticsearch) setCatMaster(url string) error {
return err
}

e.catMasterResponseTokens = strings.Split(string(response), " ")
info.masterID = strings.Split(string(response), " ")[0]

return nil
}
Expand Down
Loading