From 56c90b669dd9c66f34398f983d37bc15d5ce33a3 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Fri, 17 Dec 2021 13:57:08 -0500 Subject: [PATCH 01/19] implement Elasticsearch client --- receiver/elasticsearchreceiver/client.go | 130 ++++ receiver/elasticsearchreceiver/client_test.go | 233 ++++++ .../internal/model/clusterhealth.go | 14 + .../internal/model/nodestats.go | 148 ++++ .../testdata/sample_payloads/health.json | 17 + .../testdata/sample_payloads/nodes_linux.json | 726 ++++++++++++++++++ .../sample_payloads/nodes_others.json | 269 +++++++ 7 files changed, 1537 insertions(+) create mode 100644 receiver/elasticsearchreceiver/client.go create mode 100644 receiver/elasticsearchreceiver/client_test.go create mode 100644 receiver/elasticsearchreceiver/internal/model/clusterhealth.go create mode 100644 receiver/elasticsearchreceiver/internal/model/nodestats.go create mode 100644 receiver/elasticsearchreceiver/testdata/sample_payloads/health.json create mode 100644 receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_linux.json create mode 100644 receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_others.json diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go new file mode 100644 index 000000000000..6214468c7347 --- /dev/null +++ b/receiver/elasticsearchreceiver/client.go @@ -0,0 +1,130 @@ +package elasticsearchreceiver + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" + "go.uber.org/zap" +) + +var ( + errUnauthenticated = errors.New("status 401, unauthenticated") + errUnauthorized = errors.New("status 403, unauthorized") +) + +// elasticsearchClient defines the interface to retreive metrics from an Elasticsearch cluster. +type elasticsearchClient interface { + NodeStats(ctx context.Context) (*model.NodeStats, error) + ClusterHealth(ctx context.Context) (*model.ClusterHealth, error) +} + +// defaultElasticsearchClient is the main implementation of elastisearchClient. +// It retrieves the required metrics from Elasticsearch's REST api. +type defaultElasticsearchClient struct { + client *http.Client + endpoint *url.URL + authHeader string + logger *zap.Logger +} + +var _ elasticsearchClient = (*defaultElasticsearchClient)(nil) + +func newElasticsearchClient(logger *zap.Logger, client *http.Client, endpoint *url.URL, username, password string) *defaultElasticsearchClient { + var authHeader string + if username != "" && password != "" { + userPass := fmt.Sprintf("%s:%s", username, password) + authb64 := base64.StdEncoding.EncodeToString([]byte(userPass)) + authHeader = fmt.Sprintf("Basic %s", authb64) + } + + return &defaultElasticsearchClient{ + client: client, + authHeader: authHeader, + endpoint: endpoint, + logger: logger, + } +} + +// nodeStatsMetrics is a comma separated list of metrics that will be gathered from NodeStats. +// The available metrics are documented here for elasticsearch 7.9: +// https://www.elastic.co/guide/en/elasticsearch/reference/7.9/cluster-nodes-stats.html#cluster-nodes-stats-api-path-params +const nodeStatsMetrics = "indices,process,jvm,thread_pool,transport,http,fs" + +// nodeStatsIndexMetrics is a comma separated list of index metrics that will be gathered from NodeStats. +const nodeStatsIndexMetrics = "store,docs,indexing,get,search,merge,refresh,flush,warmer,query_cache,fielddata" + +func (c defaultElasticsearchClient) NodeStats(ctx context.Context) (*model.NodeStats, error) { + nodeStatsPath := fmt.Sprintf("_nodes/stats/%s/%s", nodeStatsMetrics, nodeStatsIndexMetrics) + + body, err := c.doRequest(ctx, nodeStatsPath) + if err != nil { + return nil, err + } + + nodeStats := model.NodeStats{} + err = json.Unmarshal(body, &nodeStats) + return &nodeStats, err +} + +func (c defaultElasticsearchClient) ClusterHealth(ctx context.Context) (*model.ClusterHealth, error) { + body, err := c.doRequest(ctx, "_cluster/health") + if err != nil { + return nil, err + } + + clusterHealth := model.ClusterHealth{} + err = json.Unmarshal(body, &clusterHealth) + return &clusterHealth, err +} + +func (c defaultElasticsearchClient) doRequest(ctx context.Context, path string) ([]byte, error) { + endpoint, err := c.endpoint.Parse(path) + if err != nil { + return nil, err + } + + req, err := http.NewRequestWithContext(ctx, "GET", endpoint.String(), nil) + if err != nil { + return nil, err + } + + if c.authHeader != "" { + req.Header.Add("Authorization", c.authHeader) + } + + resp, err := c.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, err := io.ReadAll(resp.Body) + c.logger.Debug( + "Request returned bad status code; Couldn't read request body.", + zap.String("path", path), + zap.Int("status_code", resp.StatusCode), + zap.ByteString("body", body), + zap.NamedError("body_read_error", err), + ) + } + + switch resp.StatusCode { + case 200: // OK + case 401: + return nil, errUnauthenticated + case 403: + return nil, errUnauthorized + default: + return nil, fmt.Errorf("got non 200 status code %d", resp.StatusCode) + } + + return io.ReadAll(resp.Body) +} diff --git a/receiver/elasticsearchreceiver/client_test.go b/receiver/elasticsearchreceiver/client_test.go new file mode 100644 index 000000000000..23d469f59060 --- /dev/null +++ b/receiver/elasticsearchreceiver/client_test.go @@ -0,0 +1,233 @@ +package elasticsearchreceiver + +import ( + "context" + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestNodeStatsNoPassword(t *testing.T) { + nodeJson, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") + require.NoError(t, err) + + actualNodeStats := model.NodeStats{} + require.NoError(t, json.Unmarshal(nodeJson, &actualNodeStats)) + + elasticsearchMock := mockServer(t, "", "") + defer elasticsearchMock.Close() + + url, err := url.Parse(elasticsearchMock.URL) + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") + ctx := context.Background() + nodeStats, err := client.NodeStats(ctx) + require.NoError(t, err) + + require.Equal(t, &actualNodeStats, nodeStats) +} + +func TestNodeStatsAuthentication(t *testing.T) { + nodeJson, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") + require.NoError(t, err) + + actualNodeStats := model.NodeStats{} + require.NoError(t, json.Unmarshal(nodeJson, &actualNodeStats)) + + username := "user" + password := "pass" + + elasticsearchMock := mockServer(t, username, password) + defer elasticsearchMock.Close() + + url, err := url.Parse(elasticsearchMock.URL) + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, username, password) + ctx := context.Background() + nodeStats, err := client.NodeStats(ctx) + require.NoError(t, err) + + require.Equal(t, &actualNodeStats, nodeStats) +} + +func TestNodeStatsNoAuthentication(t *testing.T) { + elasticsearchMock := mockServer(t, "user", "pass") + defer elasticsearchMock.Close() + + url, err := url.Parse(elasticsearchMock.URL) + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") + ctx := context.Background() + _, err = client.NodeStats(ctx) + require.ErrorIs(t, err, errUnauthenticated) +} + +func TestNodeStatsBadAuthentication(t *testing.T) { + elasticsearchMock := mockServer(t, "user", "pass") + defer elasticsearchMock.Close() + + url, err := url.Parse(elasticsearchMock.URL) + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "unauthorized_user", "password") + ctx := context.Background() + _, err = client.NodeStats(ctx) + require.ErrorIs(t, err, errUnauthorized) +} + +func TestClusterHealthNoPassword(t *testing.T) { + healthJson, err := ioutil.ReadFile("./testdata/sample_payloads/health.json") + require.NoError(t, err) + + actualClusterHealth := model.ClusterHealth{} + require.NoError(t, json.Unmarshal(healthJson, &actualClusterHealth)) + + elasticsearchMock := mockServer(t, "", "") + defer elasticsearchMock.Close() + + url, err := url.Parse(elasticsearchMock.URL) + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") + ctx := context.Background() + nodeStats, err := client.ClusterHealth(ctx) + require.NoError(t, err) + + require.Equal(t, &actualClusterHealth, nodeStats) +} + +func TestClusterHealthAuthentication(t *testing.T) { + healthJson, err := ioutil.ReadFile("./testdata/sample_payloads/health.json") + require.NoError(t, err) + + actualClusterHealth := model.ClusterHealth{} + require.NoError(t, json.Unmarshal(healthJson, &actualClusterHealth)) + + username := "user" + password := "pass" + + elasticsearchMock := mockServer(t, username, password) + defer elasticsearchMock.Close() + + url, err := url.Parse(elasticsearchMock.URL) + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, username, password) + ctx := context.Background() + nodeStats, err := client.ClusterHealth(ctx) + require.NoError(t, err) + + require.Equal(t, &actualClusterHealth, nodeStats) +} + +func TestClusterHealthNoAuthentication(t *testing.T) { + elasticsearchMock := mockServer(t, "user", "pass") + defer elasticsearchMock.Close() + + url, err := url.Parse(elasticsearchMock.URL) + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") + ctx := context.Background() + _, err = client.ClusterHealth(ctx) + require.ErrorIs(t, err, errUnauthenticated) +} + +func TestClusterHealthNoAuthorization(t *testing.T) { + elasticsearchMock := mockServer(t, "user", "pass") + defer elasticsearchMock.Close() + + url, err := url.Parse(elasticsearchMock.URL) + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "unauthorized_user", "password") + ctx := context.Background() + _, err = client.ClusterHealth(ctx) + require.ErrorIs(t, err, errUnauthorized) +} + +func TestDoRequestBadPath(t *testing.T) { + url, err := url.Parse("http://example.local:9200") + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "bad_username", "bad_password") + _, err = client.doRequest(context.Background(), "\x7f") + require.Error(t, err) +} + +func TestDoRequestClientTimeout(t *testing.T) { + url, err := url.Parse("http://example.local:9200") + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "bad_username", "bad_password") + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err = client.doRequest(ctx, "_cluster/health") + require.Error(t, err) +} + +func TestDoRequest404(t *testing.T) { + elasticsearchMock := mockServer(t, "", "") + defer elasticsearchMock.Close() + + url, err := url.Parse(elasticsearchMock.URL) + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") + + _, err = client.doRequest(context.Background(), "invalid_path") + require.Error(t, err) + require.Contains(t, err.Error(), "404") +} + +// mockServer gives a mock elasticsearch server for testing; if username or password is included, they will be required for the client. +// otherwise, authorization is ignored. +func mockServer(t *testing.T, username, password string) *httptest.Server { + nodes, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") + require.NoError(t, err) + health, err := ioutil.ReadFile("./testdata/sample_payloads/health.json") + require.NoError(t, err) + + elasticsearchMock := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + if username != "" || password != "" { + authUser, authPass, ok := req.BasicAuth() + if !ok { + rw.WriteHeader(401) + return + } else if authUser != username || authPass != password { + rw.WriteHeader(403) + return + } + } + + if strings.HasPrefix(req.URL.Path, "/_nodes/stats") { + rw.WriteHeader(200) + _, err = rw.Write(nodes) + require.NoError(t, err) + return + } + + if strings.HasPrefix(req.URL.Path, "/_cluster/health") { + rw.WriteHeader(200) + _, err = rw.Write(health) + require.NoError(t, err) + return + } + rw.WriteHeader(404) + })) + + return elasticsearchMock +} diff --git a/receiver/elasticsearchreceiver/internal/model/clusterhealth.go b/receiver/elasticsearchreceiver/internal/model/clusterhealth.go new file mode 100644 index 000000000000..b81086d5d7b0 --- /dev/null +++ b/receiver/elasticsearchreceiver/internal/model/clusterhealth.go @@ -0,0 +1,14 @@ +package model + +// ClusterHealth represents a response from elasticsearch's /_cluster/health endpoint. +// The struct is not exhaustive; It does not provide all values returned by elasticsearch, +// only the ones relevant to the metrics retrieved by the scraper. +type ClusterHealth struct { + ClusterName string `json:"cluster_name"` + ActiveShards int64 `json:"active_shards"` + RelocatingShards int64 `json:"relocating_shards"` + InitializingShards int64 `json:"initializing_shards"` + UnassignedShards int64 `json:"unassigned_shards"` + NodeCount int64 `json:"number_of_nodes"` + DataNodeCount int64 `json:"number_of_data_nodes"` +} diff --git a/receiver/elasticsearchreceiver/internal/model/nodestats.go b/receiver/elasticsearchreceiver/internal/model/nodestats.go new file mode 100644 index 000000000000..b6a618844979 --- /dev/null +++ b/receiver/elasticsearchreceiver/internal/model/nodestats.go @@ -0,0 +1,148 @@ +package model + +// NodeStats represents a response from elasticsearch's /_nodes/stats endpoint. +// The struct is not exhaustive; It does not provide all values returned by elasticsearch, +// only the ones relevant to the metrics retrieved by the scraper. +type NodeStats struct { + ClusterName string `json:"cluster_name"` + Nodes map[string]NodeStatsNodesInfo `json:"nodes"` +} + +type NodeStatsNodesInfo struct { + TimestampMsSinceEpoch int64 `json:"timestamp"` + Name string `json:"name"` + Indices NodeStatsNodesInfoIndices `json:"indices"` + ProcessStats ProcessStats `json:"process"` + JVMInfo JVMInfo `json:"jvm"` + ThreadPoolInfo map[string]ThreadPoolStats `json:"thread_pool"` + TransportStats TransportStats `json:"transport"` + HTTPStats HTTPStats `json:"http"` + FS FSStats `json:"fs"` +} + +type NodeStatsNodesInfoIndices struct { + StoreInfo StoreInfo `json:"store"` + DocumentStats DocumentStats `json:"docs"` + IndexingOperations IndexingOperations `json:"indexing"` + GetOperation GetOperation `json:"get"` + SearchOperations SearchOperations `json:"search"` + MergeOperations BasicIndexOperation `json:"merges"` + RefreshOperations BasicIndexOperation `json:"refresh"` + FlushOperations BasicIndexOperation `json:"flush"` + WarmerOperations BasicIndexOperation `json:"warmer"` + QueryCache BasicCacheInfo `json:"query_cache"` + FieldDataCache BasicCacheInfo `json:"fielddata"` +} + +type StoreInfo struct { + SizeInBy int64 `json:"size_in_bytes"` +} + +type BasicIndexOperation struct { + Total int64 `json:"total"` + TotalTimeInMs int64 `json:"total_time_in_millis"` +} + +type IndexingOperations struct { + IndexTotal int64 `json:"index_total"` + IndexTimeInMs int64 `json:"index_time_in_millis"` + DeleteTotal int64 `json:"delete_total"` + DeleteTimeInMs int64 `json:"delete_time_in_millis"` +} + +type GetOperation struct { + Total int64 `json:"total"` + TotalTimeInMs int64 `json:"time_in_millis"` +} + +type SearchOperations struct { + QueryTotal int64 `json:"query_total"` + QueryTimeInMs int64 `json:"query_time_in_millis"` + FetchTotal int64 `json:"fetch_total"` + FetchTimeInMs int64 `json:"fetch_time_in_millis"` + ScrollTotal int64 `json:"scroll_total"` + ScrollTimeInMs int64 `json:"scroll_time_in_millis"` + SuggestTotal int64 `json:"suggest_total"` + SuggestTimeInMs int64 `json:"suggest_time_in_millis"` +} + +type DocumentStats struct { + ActiveCount int64 `json:"count"` + DeletedCount int64 `json:"deleted"` +} + +type BasicCacheInfo struct { + Evictions int64 `json:"evictions"` + MemorySizeInBy int64 `json:"memory_size_in_bytes"` +} + +type JVMInfo struct { + JVMMemoryInfo JVMMemoryInfo `json:"mem"` + JVMThreadInfo JVMThreadInfo `json:"threads"` + JVMGCInfo JVMGCInfo `json:"gc"` +} + +type JVMMemoryInfo struct { + HeapUsedInBy int64 `json:"heap_used_in_bytes"` + NonHeapUsedInBy int64 `json:"non_heap_used_in_bytes"` +} + +type JVMThreadInfo struct { + PeakCount int64 `json:"peak_count"` + Count int64 `json:"count"` +} + +type JVMGCInfo struct { + Collectors JVMCollectors `json:"collectors"` +} + +type JVMCollectors struct { + Young BasicJVMCollectorInfo `json:"young"` + Old BasicJVMCollectorInfo `json:"old"` +} + +type BasicJVMCollectorInfo struct { + CollectionCount int64 `json:"collection_count"` + CollectionTimeInMillis int64 `json:"collection_time_in_millis"` +} + +type ThreadPoolStats struct { + TotalThreads int64 `json:"threads"` + ActiveThreads int64 `json:"active"` + QueuedTasks int64 `json:"queue"` + CompletedTasks int64 `json:"completed"` + RejectedTasks int64 `json:"rejected"` +} + +type ProcessStats struct { + OpenFileDescriptorsCount int64 `json:"open_file_descriptors"` +} + +type TransportStats struct { + OpenConnections int64 `json:"server_open"` + ReceivedBytes int64 `json:"rx_size_in_bytes"` + SentBytes int64 `json:"tx_size_in_bytes"` +} + +type HTTPStats struct { + OpenConnections int64 `json:"current_open"` +} + +type FSStats struct { + Total FSTotalStats `json:"total"` + IOStats *IOStats `json:"io_stats"` +} + +type FSTotalStats struct { + AvailableBytes int64 `json:"available_in_bytes"` + TotalBytes int64 `json:"total_in_bytes"` +} + +type IOStats struct { + Total IOStatsTotal `json:"total"` +} + +type IOStatsTotal struct { + ReadOperations int64 `json:"read_operations"` + WriteOperations int64 `json:"write_operations"` +} diff --git a/receiver/elasticsearchreceiver/testdata/sample_payloads/health.json b/receiver/elasticsearchreceiver/testdata/sample_payloads/health.json new file mode 100644 index 000000000000..ba76f60d02c1 --- /dev/null +++ b/receiver/elasticsearchreceiver/testdata/sample_payloads/health.json @@ -0,0 +1,17 @@ +{ + "cluster_name": "docker-cluster", + "status": "green", + "timed_out": false, + "number_of_nodes": 1, + "number_of_data_nodes": 1, + "active_primary_shards": 0, + "active_shards": 0, + "relocating_shards": 0, + "initializing_shards": 0, + "unassigned_shards": 0, + "delayed_unassigned_shards": 0, + "number_of_pending_tasks": 0, + "number_of_in_flight_fetch": 0, + "task_max_waiting_in_queue_millis": 0, + "active_shards_percent_as_number": 100.0 +} \ No newline at end of file diff --git a/receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_linux.json b/receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_linux.json new file mode 100644 index 000000000000..dfa82ef8f8f4 --- /dev/null +++ b/receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_linux.json @@ -0,0 +1,726 @@ +{ + "_nodes": { + "total": 1, + "successful": 1, + "failed": 0 + }, + "cluster_name": "docker-cluster", + "nodes": { + "szaFXm55RIeu8X-PTv5unQ": { + "timestamp": 1627669701946, + "name": "917e13e55eed", + "transport_address": "172.22.0.2:9300", + "host": "172.22.0.2", + "ip": "172.22.0.2:9300", + "roles": [ + "data", + "data_cold", + "data_content", + "data_frozen", + "data_hot", + "data_warm", + "ingest", + "master", + "ml", + "remote_cluster_client", + "transform" + ], + "attributes": { + "ml.machine_memory": "1073741824", + "xpack.installed": "true", + "transform.node": "true", + "ml.max_open_jobs": "512", + "ml.max_jvm_size": "536870912" + }, + "indices": { + "docs": { + "count": 0, + "deleted": 0 + }, + "store": { + "size_in_bytes": 0, + "total_data_set_size_in_bytes": 0, + "reserved_in_bytes": 0 + }, + "indexing": { + "index_total": 0, + "index_time_in_millis": 0, + "index_current": 0, + "index_failed": 0, + "delete_total": 0, + "delete_time_in_millis": 0, + "delete_current": 0, + "noop_update_total": 0, + "is_throttled": false, + "throttle_time_in_millis": 0 + }, + "get": { + "total": 0, + "time_in_millis": 0, + "exists_total": 0, + "exists_time_in_millis": 0, + "missing_total": 0, + "missing_time_in_millis": 0, + "current": 0 + }, + "search": { + "open_contexts": 0, + "query_total": 0, + "query_time_in_millis": 0, + "query_current": 0, + "fetch_total": 0, + "fetch_time_in_millis": 0, + "fetch_current": 0, + "scroll_total": 0, + "scroll_time_in_millis": 0, + "scroll_current": 0, + "suggest_total": 0, + "suggest_time_in_millis": 0, + "suggest_current": 0 + }, + "merges": { + "current": 0, + "current_docs": 0, + "current_size_in_bytes": 0, + "total": 0, + "total_time_in_millis": 0, + "total_docs": 0, + "total_size_in_bytes": 0, + "total_stopped_time_in_millis": 0, + "total_throttled_time_in_millis": 0, + "total_auto_throttle_in_bytes": 0 + }, + "refresh": { + "total": 0, + "total_time_in_millis": 0, + "external_total": 0, + "external_total_time_in_millis": 0, + "listeners": 0 + }, + "flush": { + "total": 0, + "periodic": 0, + "total_time_in_millis": 0 + }, + "warmer": { + "current": 0, + "total": 0, + "total_time_in_millis": 0 + }, + "query_cache": { + "memory_size_in_bytes": 0, + "total_count": 0, + "hit_count": 0, + "miss_count": 0, + "cache_size": 0, + "cache_count": 0, + "evictions": 0 + }, + "fielddata": { + "memory_size_in_bytes": 0, + "evictions": 0 + }, + "completion": { + "size_in_bytes": 0 + }, + "segments": { + "count": 0, + "memory_in_bytes": 0, + "terms_memory_in_bytes": 0, + "stored_fields_memory_in_bytes": 0, + "term_vectors_memory_in_bytes": 0, + "norms_memory_in_bytes": 0, + "points_memory_in_bytes": 0, + "doc_values_memory_in_bytes": 0, + "index_writer_memory_in_bytes": 0, + "version_map_memory_in_bytes": 0, + "fixed_bit_set_memory_in_bytes": 0, + "max_unsafe_auto_id_timestamp": -9223372036854775808, + "file_sizes": {} + }, + "translog": { + "operations": 0, + "size_in_bytes": 0, + "uncommitted_operations": 0, + "uncommitted_size_in_bytes": 0, + "earliest_last_modified_age": 0 + }, + "request_cache": { + "memory_size_in_bytes": 0, + "evictions": 0, + "hit_count": 0, + "miss_count": 0 + }, + "recovery": { + "current_as_source": 0, + "current_as_target": 0, + "throttle_time_in_millis": 0 + } + }, + "os": { + "timestamp": 1627669701947, + "cpu": { + "percent": 3, + "load_average": { + "1m": 0.0, + "5m": 0.02, + "15m": 0.02 + } + }, + "mem": { + "total_in_bytes": 1073741824, + "free_in_bytes": 294109184, + "used_in_bytes": 779632640, + "free_percent": 27, + "used_percent": 73 + }, + "swap": { + "total_in_bytes": 1073741824, + "free_in_bytes": 1073741824, + "used_in_bytes": 0 + }, + "cgroup": { + "cpuacct": { + "control_group": "/", + "usage_nanos": 45612972897 + }, + "cpu": { + "control_group": "/", + "cfs_period_micros": 100000, + "cfs_quota_micros": 100000, + "stat": { + "number_of_elapsed_periods": 12406, + "number_of_times_throttled": 298, + "time_throttled_nanos": 34855164850 + } + }, + "memory": { + "control_group": "/", + "limit_in_bytes": "1073741824", + "usage_in_bytes": "779632640" + } + } + }, + "process": { + "timestamp": 1627669701948, + "open_file_descriptors": 270, + "max_file_descriptors": 1048576, + "cpu": { + "percent": 0, + "total_in_millis": 42970 + }, + "mem": { + "total_virtual_in_bytes": 4961767424 + } + }, + "jvm": { + "timestamp": 1627669701948, + "uptime_in_millis": 2059021, + "mem": { + "heap_used_in_bytes": 305152000, + "heap_used_percent": 56, + "heap_committed_in_bytes": 536870912, + "heap_max_in_bytes": 536870912, + "non_heap_used_in_bytes": 128825192, + "non_heap_committed_in_bytes": 131792896, + "pools": { + "young": { + "used_in_bytes": 218103808, + "max_in_bytes": 0, + "peak_used_in_bytes": 314572800, + "peak_max_in_bytes": 0 + }, + "old": { + "used_in_bytes": 76562432, + "max_in_bytes": 536870912, + "peak_used_in_bytes": 76562432, + "peak_max_in_bytes": 536870912 + }, + "survivor": { + "used_in_bytes": 10485760, + "max_in_bytes": 0, + "peak_used_in_bytes": 41943040, + "peak_max_in_bytes": 0 + } + } + }, + "threads": { + "count": 27, + "peak_count": 28 + }, + "gc": { + "collectors": { + "young": { + "collection_count": 20, + "collection_time_in_millis": 930 + }, + "old": { + "collection_count": 10, + "collection_time_in_millis": 5 + } + } + }, + "buffer_pools": { + "mapped": { + "count": 0, + "used_in_bytes": 0, + "total_capacity_in_bytes": 0 + }, + "direct": { + "count": 9, + "used_in_bytes": 1070323, + "total_capacity_in_bytes": 1070322 + }, + "mapped - 'non-volatile memory'": { + "count": 0, + "used_in_bytes": 0, + "total_capacity_in_bytes": 0 + } + }, + "classes": { + "current_loaded_count": 20695, + "total_loaded_count": 20695, + "total_unloaded_count": 0 + } + }, + "thread_pool": { + "analyze": { + "threads": 1, + "queue": 2, + "active": 3, + "rejected": 4, + "largest": 5, + "completed": 6 + } + }, + "fs": { + "timestamp": 1627669701948, + "total": { + "total_in_bytes": 67371577344, + "free_in_bytes": 15746158592, + "available_in_bytes": 12293464064 + }, + "data": [ + { + "path": "/usr/share/elasticsearch/data/nodes/0", + "mount": "/ (overlay)", + "type": "overlay", + "total_in_bytes": 67371577344, + "free_in_bytes": 15746158592, + "available_in_bytes": 12293464064 + } + ], + "io_stats": { + "total": { + "operations": 49169, + "read_operations": 39304, + "write_operations": 9865, + "read_kilobytes": 1617780, + "write_kilobytes": 602016, + "io_time_in_millis": 27780 + } + } + }, + "transport": { + "server_open": 0, + "total_outbound_connections": 0, + "rx_count": 0, + "rx_size_in_bytes": 0, + "tx_count": 0, + "tx_size_in_bytes": 0 + }, + "http": { + "current_open": 2, + "total_opened": 3, + "clients": [ + { + "id": 1644878830, + "opened_time_millis": 1627669701929, + "closed_time_millis": 1627669701929, + "last_request_time_millis": -1, + "request_count": 0, + "request_size_bytes": 0 + }, + { + "id": 2001891351, + "agent": "Go-http-client/1.1", + "local_address": "172.22.0.2:9200", + "remote_address": "172.22.0.1:57136", + "last_uri": "/_cluster/health", + "opened_time_millis": 1627667715500, + "last_request_time_millis": 1627669695490, + "request_count": 399, + "request_size_bytes": 0 + }, + { + "id": 103547676, + "agent": "PostmanRuntime/7.28.2", + "local_address": "172.22.0.2:9200", + "remote_address": "172.22.0.1:57276", + "last_uri": "/_nodes/*/stats/", + "opened_time_millis": 1627669701929, + "last_request_time_millis": 1627669701929, + "request_count": 1, + "request_size_bytes": 0 + } + ] + }, + "breakers": { + "request": { + "limit_size_in_bytes": 322122547, + "limit_size": "307.1mb", + "estimated_size_in_bytes": 0, + "estimated_size": "0b", + "overhead": 1.0, + "tripped": 0 + }, + "fielddata": { + "limit_size_in_bytes": 214748364, + "limit_size": "204.7mb", + "estimated_size_in_bytes": 0, + "estimated_size": "0b", + "overhead": 1.03, + "tripped": 0 + }, + "in_flight_requests": { + "limit_size_in_bytes": 536870912, + "limit_size": "512mb", + "estimated_size_in_bytes": 0, + "estimated_size": "0b", + "overhead": 2.0, + "tripped": 0 + }, + "model_inference": { + "limit_size_in_bytes": 268435456, + "limit_size": "256mb", + "estimated_size_in_bytes": 0, + "estimated_size": "0b", + "overhead": 1.0, + "tripped": 0 + }, + "accounting": { + "limit_size_in_bytes": 536870912, + "limit_size": "512mb", + "estimated_size_in_bytes": 0, + "estimated_size": "0b", + "overhead": 1.0, + "tripped": 0 + }, + "parent": { + "limit_size_in_bytes": 510027366, + "limit_size": "486.3mb", + "estimated_size_in_bytes": 305152000, + "estimated_size": "291mb", + "overhead": 1.0, + "tripped": 0 + } + }, + "script": { + "compilations": 1, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + "discovery": { + "cluster_state_queue": { + "total": 0, + "pending": 0, + "committed": 0 + }, + "published_cluster_states": { + "full_states": 2, + "incompatible_diffs": 0, + "compatible_diffs": 1 + } + }, + "ingest": { + "total": { + "count": 0, + "time_in_millis": 0, + "current": 0, + "failed": 0 + }, + "pipelines": { + "xpack_monitoring_6": { + "count": 0, + "time_in_millis": 0, + "current": 0, + "failed": 0, + "processors": [ + { + "script": { + "type": "script", + "stats": { + "count": 0, + "time_in_millis": 0, + "current": 0, + "failed": 0 + } + } + }, + { + "gsub": { + "type": "gsub", + "stats": { + "count": 0, + "time_in_millis": 0, + "current": 0, + "failed": 0 + } + } + } + ] + }, + "xpack_monitoring_7": { + "count": 0, + "time_in_millis": 0, + "current": 0, + "failed": 0, + "processors": [] + } + } + }, + "adaptive_selection": {}, + "script_cache": { + "sum": { + "compilations": 1, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + "contexts": [ + { + "context": "aggregation_selector", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "aggs", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "aggs_combine", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "aggs_init", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "aggs_map", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "aggs_reduce", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "analysis", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "boolean_field", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "bucket_aggregation", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "date_field", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "double_field", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "field", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "filter", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "geo_point_field", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "ingest", + "compilations": 1, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "ingest_template", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "interval", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "ip_field", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "keyword_field", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "long_field", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "moving-function", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "number_sort", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "painless_test", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "processor_conditional", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "score", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "script_heuristic", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "similarity", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "similarity_weight", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "string_sort", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "template", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "terms_set", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "update", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "watcher_condition", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "watcher_transform", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + }, + { + "context": "xpack_template", + "compilations": 0, + "cache_evictions": 0, + "compilation_limit_triggered": 0 + } + ] + }, + "indexing_pressure": { + "memory": { + "current": { + "combined_coordinating_and_primary_in_bytes": 0, + "coordinating_in_bytes": 0, + "primary_in_bytes": 0, + "replica_in_bytes": 0, + "all_in_bytes": 0 + }, + "total": { + "combined_coordinating_and_primary_in_bytes": 0, + "coordinating_in_bytes": 0, + "primary_in_bytes": 0, + "replica_in_bytes": 0, + "all_in_bytes": 0, + "coordinating_rejections": 0, + "primary_rejections": 0, + "replica_rejections": 0 + }, + "limit_in_bytes": 53687091 + } + } + } + } +} diff --git a/receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_others.json b/receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_others.json new file mode 100644 index 000000000000..d376612e516a --- /dev/null +++ b/receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_others.json @@ -0,0 +1,269 @@ +{ + "_nodes": { + "total": 1, + "successful": 1, + "failed": 0 + }, + "cluster_name": "elasticsearch", + "nodes": { + "i0X7Va8lSS-X7aqfGmCTPg": { + "timestamp": 1639500114496, + "name": "local-node", + "transport_address": "127.0.0.1:9300", + "host": "127.0.0.1", + "ip": "127.0.0.1:9300", + "roles": [ + "data", + "data_cold", + "data_content", + "data_frozen", + "data_hot", + "data_warm", + "ingest", + "master", + "ml", + "remote_cluster_client", + "transform" + ], + "attributes": { + "ml.machine_memory": "17179869184", + "xpack.installed": "true", + "transform.node": "true", + "ml.max_open_jobs": "512", + "ml.max_jvm_size": "8589934592" + }, + "indices": { + "docs": { + "count": 1049, + "deleted": 2 + }, + "store": { + "size_in_bytes": 43495907, + "total_data_set_size_in_bytes": 43495907, + "reserved_in_bytes": 0 + }, + "indexing": { + "index_total": 1049, + "index_time_in_millis": 4518, + "index_current": 0, + "index_failed": 0, + "delete_total": 2, + "delete_time_in_millis": 13, + "delete_current": 0, + "noop_update_total": 0, + "is_throttled": false, + "throttle_time_in_millis": 0 + }, + "get": { + "total": 2, + "time_in_millis": 20, + "exists_total": 0, + "exists_time_in_millis": 0, + "missing_total": 0, + "missing_time_in_millis": 0, + "current": 0 + }, + "search": { + "open_contexts": 0, + "query_total": 55, + "query_time_in_millis": 103, + "query_current": 0, + "fetch_total": 55, + "fetch_time_in_millis": 85, + "fetch_current": 0, + "scroll_total": 3, + "scroll_time_in_millis": 51, + "scroll_current": 0, + "suggest_total": 1, + "suggest_time_in_millis": 17, + "suggest_current": 0 + }, + "merges": { + "current": 0, + "current_docs": 0, + "current_size_in_bytes": 0, + "total": 3, + "total_time_in_millis": 10, + "total_docs": 0, + "total_size_in_bytes": 0, + "total_stopped_time_in_millis": 0, + "total_throttled_time_in_millis": 0, + "total_auto_throttle_in_bytes": 83886080 + }, + "refresh": { + "total": 31, + "total_time_in_millis": 2785, + "external_total": 25, + "external_total_time_in_millis": 2831, + "listeners": 0 + }, + "flush": { + "total": 7, + "periodic": 0, + "total_time_in_millis": 1468 + }, + "warmer": { + "current": 0, + "total": 19, + "total_time_in_millis": 1 + }, + "query_cache": { + "memory_size_in_bytes": 12, + "total_count": 3, + "hit_count": 5, + "miss_count": 0, + "cache_size": 0, + "cache_count": 0, + "evictions": 2 + }, + "fielddata": { + "memory_size_in_bytes": 200, + "evictions": 1 + } + }, + "process": { + "timestamp": 1639500114496, + "open_file_descriptors": 344, + "max_file_descriptors": 10240, + "cpu": { + "percent": 0, + "total_in_millis": 55860 + }, + "mem": { + "total_virtual_in_bytes": 15414005760 + } + }, + "jvm": { + "timestamp": 1639500114497, + "uptime_in_millis": 995417, + "mem": { + "heap_used_in_bytes": 593718576, + "heap_used_percent": 6, + "heap_committed_in_bytes": 8589934592, + "heap_max_in_bytes": 8589934592, + "non_heap_used_in_bytes": 154454456, + "non_heap_committed_in_bytes": 157548544, + "pools": { + "young": { + "used_in_bytes": 490733568, + "max_in_bytes": 0, + "peak_used_in_bytes": 612368384, + "peak_max_in_bytes": 0 + }, + "old": { + "used_in_bytes": 58787328, + "max_in_bytes": 8589934592, + "peak_used_in_bytes": 58787328, + "peak_max_in_bytes": 8589934592 + }, + "survivor": { + "used_in_bytes": 44197680, + "max_in_bytes": 0, + "peak_used_in_bytes": 66966784, + "peak_max_in_bytes": 0 + } + } + }, + "threads": { + "count": 96, + "peak_count": 96 + }, + "gc": { + "collectors": { + "young": { + "collection_count": 9, + "collection_time_in_millis": 189 + }, + "old": { + "collection_count": 3, + "collection_time_in_millis": 10 + } + } + }, + "buffer_pools": { + "mapped": { + "count": 17, + "used_in_bytes": 43480134, + "total_capacity_in_bytes": 43480134 + }, + "direct": { + "count": 53, + "used_in_bytes": 13473399, + "total_capacity_in_bytes": 13473398 + }, + "mapped - 'non-volatile memory'": { + "count": 0, + "used_in_bytes": 0, + "total_capacity_in_bytes": 0 + } + }, + "classes": { + "current_loaded_count": 24623, + "total_loaded_count": 24623, + "total_unloaded_count": 0 + } + }, + "thread_pool": { + "analyze": { + "threads": 2, + "queue": 3, + "active": 4, + "rejected": 5, + "largest": 6, + "completed": 7 + }, + "write": { + "threads": 12, + "queue": 2, + "active": 4, + "rejected": 5, + "largest": 12, + "completed": 15 + } + }, + "fs": { + "timestamp": 1639500114497, + "total": { + "total_in_bytes": 499963174912, + "free_in_bytes": 273278246912, + "available_in_bytes": 245969842176 + }, + "data": [ + { + "path": "/usr/local/var/lib/elasticsearch/nodes/0", + "mount": "/System/Volumes/Data (/dev/disk1s2)", + "type": "apfs", + "total_in_bytes": 499963174912, + "free_in_bytes": 273278246912, + "available_in_bytes": 245969842176 + } + ] + }, + "transport": { + "server_open": 0, + "total_outbound_connections": 0, + "rx_count": 0, + "rx_size_in_bytes": 105, + "tx_count": 1, + "tx_size_in_bytes": 24 + }, + "http": { + "current_open": 1, + "total_opened": 28, + "clients": [ + { + "id": 551251092, + "agent": "curl/7.64.1", + "local_address": "127.0.0.1:9201", + "remote_address": "127.0.0.1:60869", + "last_uri": "/_nodes/stats/indices,process,jvm,thread_pool,transport,http,fs/store,docs,indexing,get,search,merge,refresh,flush,warmer,query_cache,fielddata", + "opened_time_millis": 1639500114343, + "last_request_time_millis": 1639500114343, + "request_count": 1, + "request_size_bytes": 0 + } + ] + } + } + } +} From 9954acb346bea4f2dc4cfcdf1ec3269fac74b734 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Fri, 17 Dec 2021 14:23:39 -0500 Subject: [PATCH 02/19] Add status to cluster health struct --- receiver/elasticsearchreceiver/internal/model/clusterhealth.go | 1 + 1 file changed, 1 insertion(+) diff --git a/receiver/elasticsearchreceiver/internal/model/clusterhealth.go b/receiver/elasticsearchreceiver/internal/model/clusterhealth.go index b81086d5d7b0..2be7dc3c53e5 100644 --- a/receiver/elasticsearchreceiver/internal/model/clusterhealth.go +++ b/receiver/elasticsearchreceiver/internal/model/clusterhealth.go @@ -11,4 +11,5 @@ type ClusterHealth struct { UnassignedShards int64 `json:"unassigned_shards"` NodeCount int64 `json:"number_of_nodes"` DataNodeCount int64 `json:"number_of_data_nodes"` + Status string `json:"status"` } From 2080634e2ed227c6629eed8d94924859afaaa057 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Fri, 17 Dec 2021 14:41:54 -0500 Subject: [PATCH 03/19] add jvm uptime to nodestats --- receiver/elasticsearchreceiver/internal/model/nodestats.go | 1 + 1 file changed, 1 insertion(+) diff --git a/receiver/elasticsearchreceiver/internal/model/nodestats.go b/receiver/elasticsearchreceiver/internal/model/nodestats.go index b6a618844979..56e0923b3e70 100644 --- a/receiver/elasticsearchreceiver/internal/model/nodestats.go +++ b/receiver/elasticsearchreceiver/internal/model/nodestats.go @@ -77,6 +77,7 @@ type BasicCacheInfo struct { } type JVMInfo struct { + UptimeInMs int64 `json:"uptime_in_millis"` JVMMemoryInfo JVMMemoryInfo `json:"mem"` JVMThreadInfo JVMThreadInfo `json:"threads"` JVMGCInfo JVMGCInfo `json:"gc"` From 35e991d9062b56b3a9042a003f30adb18536fa2f Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Fri, 17 Dec 2021 16:09:59 -0500 Subject: [PATCH 04/19] remove unused sample payload (for now) --- .../sample_payloads/nodes_others.json | 269 ------------------ 1 file changed, 269 deletions(-) delete mode 100644 receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_others.json diff --git a/receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_others.json b/receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_others.json deleted file mode 100644 index d376612e516a..000000000000 --- a/receiver/elasticsearchreceiver/testdata/sample_payloads/nodes_others.json +++ /dev/null @@ -1,269 +0,0 @@ -{ - "_nodes": { - "total": 1, - "successful": 1, - "failed": 0 - }, - "cluster_name": "elasticsearch", - "nodes": { - "i0X7Va8lSS-X7aqfGmCTPg": { - "timestamp": 1639500114496, - "name": "local-node", - "transport_address": "127.0.0.1:9300", - "host": "127.0.0.1", - "ip": "127.0.0.1:9300", - "roles": [ - "data", - "data_cold", - "data_content", - "data_frozen", - "data_hot", - "data_warm", - "ingest", - "master", - "ml", - "remote_cluster_client", - "transform" - ], - "attributes": { - "ml.machine_memory": "17179869184", - "xpack.installed": "true", - "transform.node": "true", - "ml.max_open_jobs": "512", - "ml.max_jvm_size": "8589934592" - }, - "indices": { - "docs": { - "count": 1049, - "deleted": 2 - }, - "store": { - "size_in_bytes": 43495907, - "total_data_set_size_in_bytes": 43495907, - "reserved_in_bytes": 0 - }, - "indexing": { - "index_total": 1049, - "index_time_in_millis": 4518, - "index_current": 0, - "index_failed": 0, - "delete_total": 2, - "delete_time_in_millis": 13, - "delete_current": 0, - "noop_update_total": 0, - "is_throttled": false, - "throttle_time_in_millis": 0 - }, - "get": { - "total": 2, - "time_in_millis": 20, - "exists_total": 0, - "exists_time_in_millis": 0, - "missing_total": 0, - "missing_time_in_millis": 0, - "current": 0 - }, - "search": { - "open_contexts": 0, - "query_total": 55, - "query_time_in_millis": 103, - "query_current": 0, - "fetch_total": 55, - "fetch_time_in_millis": 85, - "fetch_current": 0, - "scroll_total": 3, - "scroll_time_in_millis": 51, - "scroll_current": 0, - "suggest_total": 1, - "suggest_time_in_millis": 17, - "suggest_current": 0 - }, - "merges": { - "current": 0, - "current_docs": 0, - "current_size_in_bytes": 0, - "total": 3, - "total_time_in_millis": 10, - "total_docs": 0, - "total_size_in_bytes": 0, - "total_stopped_time_in_millis": 0, - "total_throttled_time_in_millis": 0, - "total_auto_throttle_in_bytes": 83886080 - }, - "refresh": { - "total": 31, - "total_time_in_millis": 2785, - "external_total": 25, - "external_total_time_in_millis": 2831, - "listeners": 0 - }, - "flush": { - "total": 7, - "periodic": 0, - "total_time_in_millis": 1468 - }, - "warmer": { - "current": 0, - "total": 19, - "total_time_in_millis": 1 - }, - "query_cache": { - "memory_size_in_bytes": 12, - "total_count": 3, - "hit_count": 5, - "miss_count": 0, - "cache_size": 0, - "cache_count": 0, - "evictions": 2 - }, - "fielddata": { - "memory_size_in_bytes": 200, - "evictions": 1 - } - }, - "process": { - "timestamp": 1639500114496, - "open_file_descriptors": 344, - "max_file_descriptors": 10240, - "cpu": { - "percent": 0, - "total_in_millis": 55860 - }, - "mem": { - "total_virtual_in_bytes": 15414005760 - } - }, - "jvm": { - "timestamp": 1639500114497, - "uptime_in_millis": 995417, - "mem": { - "heap_used_in_bytes": 593718576, - "heap_used_percent": 6, - "heap_committed_in_bytes": 8589934592, - "heap_max_in_bytes": 8589934592, - "non_heap_used_in_bytes": 154454456, - "non_heap_committed_in_bytes": 157548544, - "pools": { - "young": { - "used_in_bytes": 490733568, - "max_in_bytes": 0, - "peak_used_in_bytes": 612368384, - "peak_max_in_bytes": 0 - }, - "old": { - "used_in_bytes": 58787328, - "max_in_bytes": 8589934592, - "peak_used_in_bytes": 58787328, - "peak_max_in_bytes": 8589934592 - }, - "survivor": { - "used_in_bytes": 44197680, - "max_in_bytes": 0, - "peak_used_in_bytes": 66966784, - "peak_max_in_bytes": 0 - } - } - }, - "threads": { - "count": 96, - "peak_count": 96 - }, - "gc": { - "collectors": { - "young": { - "collection_count": 9, - "collection_time_in_millis": 189 - }, - "old": { - "collection_count": 3, - "collection_time_in_millis": 10 - } - } - }, - "buffer_pools": { - "mapped": { - "count": 17, - "used_in_bytes": 43480134, - "total_capacity_in_bytes": 43480134 - }, - "direct": { - "count": 53, - "used_in_bytes": 13473399, - "total_capacity_in_bytes": 13473398 - }, - "mapped - 'non-volatile memory'": { - "count": 0, - "used_in_bytes": 0, - "total_capacity_in_bytes": 0 - } - }, - "classes": { - "current_loaded_count": 24623, - "total_loaded_count": 24623, - "total_unloaded_count": 0 - } - }, - "thread_pool": { - "analyze": { - "threads": 2, - "queue": 3, - "active": 4, - "rejected": 5, - "largest": 6, - "completed": 7 - }, - "write": { - "threads": 12, - "queue": 2, - "active": 4, - "rejected": 5, - "largest": 12, - "completed": 15 - } - }, - "fs": { - "timestamp": 1639500114497, - "total": { - "total_in_bytes": 499963174912, - "free_in_bytes": 273278246912, - "available_in_bytes": 245969842176 - }, - "data": [ - { - "path": "/usr/local/var/lib/elasticsearch/nodes/0", - "mount": "/System/Volumes/Data (/dev/disk1s2)", - "type": "apfs", - "total_in_bytes": 499963174912, - "free_in_bytes": 273278246912, - "available_in_bytes": 245969842176 - } - ] - }, - "transport": { - "server_open": 0, - "total_outbound_connections": 0, - "rx_count": 0, - "rx_size_in_bytes": 105, - "tx_count": 1, - "tx_size_in_bytes": 24 - }, - "http": { - "current_open": 1, - "total_opened": 28, - "clients": [ - { - "id": 551251092, - "agent": "curl/7.64.1", - "local_address": "127.0.0.1:9201", - "remote_address": "127.0.0.1:60869", - "last_uri": "/_nodes/stats/indices,process,jvm,thread_pool,transport,http,fs/store,docs,indexing,get,search,merge,refresh,flush,warmer,query_cache,fielddata", - "opened_time_millis": 1639500114343, - "last_request_time_millis": 1639500114343, - "request_count": 1, - "request_size_bytes": 0 - } - ] - } - } - } -} From 559bcb5547d47eb6478159558f527822739d8fc1 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Mon, 3 Jan 2022 14:13:51 -0500 Subject: [PATCH 05/19] Allow client to specify nodes to get stats from --- receiver/elasticsearchreceiver/client.go | 10 ++++++---- receiver/elasticsearchreceiver/client_test.go | 14 +++++++------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go index 6214468c7347..a32328b12e21 100644 --- a/receiver/elasticsearchreceiver/client.go +++ b/receiver/elasticsearchreceiver/client.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "net/url" + "strings" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" "go.uber.org/zap" @@ -21,11 +22,11 @@ var ( // elasticsearchClient defines the interface to retreive metrics from an Elasticsearch cluster. type elasticsearchClient interface { - NodeStats(ctx context.Context) (*model.NodeStats, error) + NodeStats(ctx context.Context, nodes []string) (*model.NodeStats, error) ClusterHealth(ctx context.Context) (*model.ClusterHealth, error) } -// defaultElasticsearchClient is the main implementation of elastisearchClient. +// defaultElasticsearchClient is the main implementation of elasticsearchClient. // It retrieves the required metrics from Elasticsearch's REST api. type defaultElasticsearchClient struct { client *http.Client @@ -60,8 +61,9 @@ const nodeStatsMetrics = "indices,process,jvm,thread_pool,transport,http,fs" // nodeStatsIndexMetrics is a comma separated list of index metrics that will be gathered from NodeStats. const nodeStatsIndexMetrics = "store,docs,indexing,get,search,merge,refresh,flush,warmer,query_cache,fielddata" -func (c defaultElasticsearchClient) NodeStats(ctx context.Context) (*model.NodeStats, error) { - nodeStatsPath := fmt.Sprintf("_nodes/stats/%s/%s", nodeStatsMetrics, nodeStatsIndexMetrics) +func (c defaultElasticsearchClient) NodeStats(ctx context.Context, nodes []string) (*model.NodeStats, error) { + nodeSpec := strings.Join(nodes, ",") + nodeStatsPath := fmt.Sprintf("_nodes/%s/stats/%s/%s", nodeSpec, nodeStatsMetrics, nodeStatsIndexMetrics) body, err := c.doRequest(ctx, nodeStatsPath) if err != nil { diff --git a/receiver/elasticsearchreceiver/client_test.go b/receiver/elasticsearchreceiver/client_test.go index 23d469f59060..7eac3f1bf6c7 100644 --- a/receiver/elasticsearchreceiver/client_test.go +++ b/receiver/elasticsearchreceiver/client_test.go @@ -30,7 +30,7 @@ func TestNodeStatsNoPassword(t *testing.T) { client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") ctx := context.Background() - nodeStats, err := client.NodeStats(ctx) + nodeStats, err := client.NodeStats(ctx, []string{"_all"}) require.NoError(t, err) require.Equal(t, &actualNodeStats, nodeStats) @@ -54,7 +54,7 @@ func TestNodeStatsAuthentication(t *testing.T) { client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, username, password) ctx := context.Background() - nodeStats, err := client.NodeStats(ctx) + nodeStats, err := client.NodeStats(ctx, []string{"_all"}) require.NoError(t, err) require.Equal(t, &actualNodeStats, nodeStats) @@ -69,7 +69,7 @@ func TestNodeStatsNoAuthentication(t *testing.T) { client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") ctx := context.Background() - _, err = client.NodeStats(ctx) + _, err = client.NodeStats(ctx, []string{"_all"}) require.ErrorIs(t, err, errUnauthenticated) } @@ -82,7 +82,7 @@ func TestNodeStatsBadAuthentication(t *testing.T) { client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "unauthorized_user", "password") ctx := context.Background() - _, err = client.NodeStats(ctx) + _, err = client.NodeStats(ctx, []string{"_all"}) require.ErrorIs(t, err, errUnauthorized) } @@ -158,7 +158,7 @@ func TestClusterHealthNoAuthorization(t *testing.T) { } func TestDoRequestBadPath(t *testing.T) { - url, err := url.Parse("http://example.local:9200") + url, err := url.Parse("http://example.localhost:9200") require.NoError(t, err) client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "bad_username", "bad_password") @@ -167,7 +167,7 @@ func TestDoRequestBadPath(t *testing.T) { } func TestDoRequestClientTimeout(t *testing.T) { - url, err := url.Parse("http://example.local:9200") + url, err := url.Parse("http://example.localhost:9200") require.NoError(t, err) client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "bad_username", "bad_password") @@ -213,7 +213,7 @@ func mockServer(t *testing.T, username, password string) *httptest.Server { } } - if strings.HasPrefix(req.URL.Path, "/_nodes/stats") { + if strings.HasPrefix(req.URL.Path, "/_nodes/_all/stats") { rw.WriteHeader(200) _, err = rw.Write(nodes) require.NoError(t, err) From fe64f0c9a20f2877c0e393308e96ca4631fa1c06 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 4 Jan 2022 14:01:19 -0500 Subject: [PATCH 06/19] add empty nodes check, add test for nil/empty nodes --- receiver/elasticsearchreceiver/client.go | 8 ++++++- receiver/elasticsearchreceiver/client_test.go | 21 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go index a32328b12e21..17febfec4783 100644 --- a/receiver/elasticsearchreceiver/client.go +++ b/receiver/elasticsearchreceiver/client.go @@ -62,7 +62,13 @@ const nodeStatsMetrics = "indices,process,jvm,thread_pool,transport,http,fs" const nodeStatsIndexMetrics = "store,docs,indexing,get,search,merge,refresh,flush,warmer,query_cache,fielddata" func (c defaultElasticsearchClient) NodeStats(ctx context.Context, nodes []string) (*model.NodeStats, error) { - nodeSpec := strings.Join(nodes, ",") + var nodeSpec string + if len(nodes) > 0 { + nodeSpec = strings.Join(nodes, ",") + } else { + nodeSpec = "_all" + } + nodeStatsPath := fmt.Sprintf("_nodes/%s/stats/%s/%s", nodeSpec, nodeStatsMetrics, nodeStatsIndexMetrics) body, err := c.doRequest(ctx, nodeStatsPath) diff --git a/receiver/elasticsearchreceiver/client_test.go b/receiver/elasticsearchreceiver/client_test.go index 7eac3f1bf6c7..c8b1c16ca31d 100644 --- a/receiver/elasticsearchreceiver/client_test.go +++ b/receiver/elasticsearchreceiver/client_test.go @@ -36,6 +36,27 @@ func TestNodeStatsNoPassword(t *testing.T) { require.Equal(t, &actualNodeStats, nodeStats) } +func TestNodeStatsNilNodes(t *testing.T) { + nodeJson, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") + require.NoError(t, err) + + actualNodeStats := model.NodeStats{} + require.NoError(t, json.Unmarshal(nodeJson, &actualNodeStats)) + + elasticsearchMock := mockServer(t, "", "") + defer elasticsearchMock.Close() + + url, err := url.Parse(elasticsearchMock.URL) + require.NoError(t, err) + + client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") + ctx := context.Background() + nodeStats, err := client.NodeStats(ctx, nil) + require.NoError(t, err) + + require.Equal(t, &actualNodeStats, nodeStats) +} + func TestNodeStatsAuthentication(t *testing.T) { nodeJson, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") require.NoError(t, err) From 67bc0b205432f09d3239b083b9a29106daf54549 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 4 Jan 2022 14:04:05 -0500 Subject: [PATCH 07/19] go mod tidy --- receiver/elasticsearchreceiver/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/elasticsearchreceiver/go.mod b/receiver/elasticsearchreceiver/go.mod index 05b4bdede39c..1a5ced0eaed5 100644 --- a/receiver/elasticsearchreceiver/go.mod +++ b/receiver/elasticsearchreceiver/go.mod @@ -7,6 +7,7 @@ require ( go.opentelemetry.io/collector v0.41.1-0.20220105191026-60823e25df1c go.opentelemetry.io/collector/model v0.41.1-0.20220105191026-60823e25df1c go.uber.org/multierr v1.7.0 + go.uber.org/zap v1.20.0 ) require ( @@ -37,7 +38,6 @@ require ( go.opentelemetry.io/otel/metric v0.26.0 // indirect go.opentelemetry.io/otel/trace v1.3.0 // indirect go.uber.org/atomic v1.9.0 // indirect - go.uber.org/zap v1.20.0 // indirect golang.org/x/net v0.0.0-20211108170745-6635138e15ea // indirect golang.org/x/sys v0.0.0-20211205182925-97ca703d548d // indirect golang.org/x/text v0.3.7 // indirect From 96e0747fcb56b0f511f06e18ccc9799b59cfcda9 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 4 Jan 2022 14:13:01 -0500 Subject: [PATCH 08/19] minor wording tweaks --- receiver/elasticsearchreceiver/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go index 17febfec4783..b986a5f29ff8 100644 --- a/receiver/elasticsearchreceiver/client.go +++ b/receiver/elasticsearchreceiver/client.go @@ -54,7 +54,7 @@ func newElasticsearchClient(logger *zap.Logger, client *http.Client, endpoint *u } // nodeStatsMetrics is a comma separated list of metrics that will be gathered from NodeStats. -// The available metrics are documented here for elasticsearch 7.9: +// The available metrics are documented here for Elasticsearch 7.9: // https://www.elastic.co/guide/en/elasticsearch/reference/7.9/cluster-nodes-stats.html#cluster-nodes-stats-api-path-params const nodeStatsMetrics = "indices,process,jvm,thread_pool,transport,http,fs" @@ -116,7 +116,7 @@ func (c defaultElasticsearchClient) doRequest(ctx context.Context, path string) if resp.StatusCode != 200 { body, err := io.ReadAll(resp.Body) c.logger.Debug( - "Request returned bad status code; Couldn't read request body.", + "Failed to make request to Elasticsearch", zap.String("path", path), zap.Int("status_code", resp.StatusCode), zap.ByteString("body", body), From 8a8578024495b505b6f270cd89b55f84e364fe83 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 4 Jan 2022 14:16:03 -0500 Subject: [PATCH 09/19] add in max heap mem to nodestats model --- receiver/elasticsearchreceiver/internal/model/nodestats.go | 1 + 1 file changed, 1 insertion(+) diff --git a/receiver/elasticsearchreceiver/internal/model/nodestats.go b/receiver/elasticsearchreceiver/internal/model/nodestats.go index 56e0923b3e70..c29c409fcca9 100644 --- a/receiver/elasticsearchreceiver/internal/model/nodestats.go +++ b/receiver/elasticsearchreceiver/internal/model/nodestats.go @@ -86,6 +86,7 @@ type JVMInfo struct { type JVMMemoryInfo struct { HeapUsedInBy int64 `json:"heap_used_in_bytes"` NonHeapUsedInBy int64 `json:"non_heap_used_in_bytes"` + MaxHeapInBy int64 `json:"heap_max_in_bytes"` } type JVMThreadInfo struct { From f0743e60ea0ff17147dd69d7c2aeebe5882fbc0b Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 4 Jan 2022 14:35:27 -0500 Subject: [PATCH 10/19] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73fe76cf5368..de451ebeff37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ## 💡 Enhancements 💡 - `couchdbreceiver`: Add couchdb client (#6880) +- `elasticsearchreceiver`: Implement scraper client (#7019) - `prometheusremotewriteexporter`: Handling Staleness flag from OTLP (#6679) - `prometheusexporter`: Handling Staleness flag from OTLP (#6805) - `prometheusreceiver`: Set OTLP no-data-present flag for stale scraped metrics. (#7043) From 401793ee01562fcb5e50719389628eb130b2862e Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 4 Jan 2022 14:40:42 -0500 Subject: [PATCH 11/19] fix lint errors --- receiver/elasticsearchreceiver/client.go | 3 ++- receiver/elasticsearchreceiver/client_test.go | 21 ++++++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go index b986a5f29ff8..77ce1cbb1bd5 100644 --- a/receiver/elasticsearchreceiver/client.go +++ b/receiver/elasticsearchreceiver/client.go @@ -12,6 +12,7 @@ import ( "strings" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" + "go.uber.org/zap" ) @@ -20,7 +21,7 @@ var ( errUnauthorized = errors.New("status 403, unauthorized") ) -// elasticsearchClient defines the interface to retreive metrics from an Elasticsearch cluster. +// elasticsearchClient defines the interface to retrieve metrics from an Elasticsearch cluster. type elasticsearchClient interface { NodeStats(ctx context.Context, nodes []string) (*model.NodeStats, error) ClusterHealth(ctx context.Context) (*model.ClusterHealth, error) diff --git a/receiver/elasticsearchreceiver/client_test.go b/receiver/elasticsearchreceiver/client_test.go index c8b1c16ca31d..133440aed8c5 100644 --- a/receiver/elasticsearchreceiver/client_test.go +++ b/receiver/elasticsearchreceiver/client_test.go @@ -11,16 +11,17 @@ import ( "testing" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" + "github.com/stretchr/testify/require" "go.uber.org/zap" ) func TestNodeStatsNoPassword(t *testing.T) { - nodeJson, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") + nodeJSON, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") require.NoError(t, err) actualNodeStats := model.NodeStats{} - require.NoError(t, json.Unmarshal(nodeJson, &actualNodeStats)) + require.NoError(t, json.Unmarshal(nodeJSON, &actualNodeStats)) elasticsearchMock := mockServer(t, "", "") defer elasticsearchMock.Close() @@ -37,11 +38,11 @@ func TestNodeStatsNoPassword(t *testing.T) { } func TestNodeStatsNilNodes(t *testing.T) { - nodeJson, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") + nodeJSON, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") require.NoError(t, err) actualNodeStats := model.NodeStats{} - require.NoError(t, json.Unmarshal(nodeJson, &actualNodeStats)) + require.NoError(t, json.Unmarshal(nodeJSON, &actualNodeStats)) elasticsearchMock := mockServer(t, "", "") defer elasticsearchMock.Close() @@ -58,11 +59,11 @@ func TestNodeStatsNilNodes(t *testing.T) { } func TestNodeStatsAuthentication(t *testing.T) { - nodeJson, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") + nodeJSON, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") require.NoError(t, err) actualNodeStats := model.NodeStats{} - require.NoError(t, json.Unmarshal(nodeJson, &actualNodeStats)) + require.NoError(t, json.Unmarshal(nodeJSON, &actualNodeStats)) username := "user" password := "pass" @@ -108,11 +109,11 @@ func TestNodeStatsBadAuthentication(t *testing.T) { } func TestClusterHealthNoPassword(t *testing.T) { - healthJson, err := ioutil.ReadFile("./testdata/sample_payloads/health.json") + healthJSON, err := ioutil.ReadFile("./testdata/sample_payloads/health.json") require.NoError(t, err) actualClusterHealth := model.ClusterHealth{} - require.NoError(t, json.Unmarshal(healthJson, &actualClusterHealth)) + require.NoError(t, json.Unmarshal(healthJSON, &actualClusterHealth)) elasticsearchMock := mockServer(t, "", "") defer elasticsearchMock.Close() @@ -129,11 +130,11 @@ func TestClusterHealthNoPassword(t *testing.T) { } func TestClusterHealthAuthentication(t *testing.T) { - healthJson, err := ioutil.ReadFile("./testdata/sample_payloads/health.json") + healthJSON, err := ioutil.ReadFile("./testdata/sample_payloads/health.json") require.NoError(t, err) actualClusterHealth := model.ClusterHealth{} - require.NoError(t, json.Unmarshal(healthJson, &actualClusterHealth)) + require.NoError(t, json.Unmarshal(healthJSON, &actualClusterHealth)) username := "user" password := "pass" From 78eac94a3a7720f3e78f8ddb8727f7e3a3d41d8e Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 4 Jan 2022 14:49:19 -0500 Subject: [PATCH 12/19] addlicense --- receiver/elasticsearchreceiver/client.go | 14 ++++++++++++++ receiver/elasticsearchreceiver/client_test.go | 14 ++++++++++++++ .../internal/model/clusterhealth.go | 14 ++++++++++++++ .../internal/model/nodestats.go | 14 ++++++++++++++ 4 files changed, 56 insertions(+) diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go index 77ce1cbb1bd5..ecd541ff411d 100644 --- a/receiver/elasticsearchreceiver/client.go +++ b/receiver/elasticsearchreceiver/client.go @@ -1,3 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package elasticsearchreceiver import ( diff --git a/receiver/elasticsearchreceiver/client_test.go b/receiver/elasticsearchreceiver/client_test.go index 133440aed8c5..b25db6a2fc43 100644 --- a/receiver/elasticsearchreceiver/client_test.go +++ b/receiver/elasticsearchreceiver/client_test.go @@ -1,3 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package elasticsearchreceiver import ( diff --git a/receiver/elasticsearchreceiver/internal/model/clusterhealth.go b/receiver/elasticsearchreceiver/internal/model/clusterhealth.go index 2be7dc3c53e5..784ba6fa8e51 100644 --- a/receiver/elasticsearchreceiver/internal/model/clusterhealth.go +++ b/receiver/elasticsearchreceiver/internal/model/clusterhealth.go @@ -1,3 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package model // ClusterHealth represents a response from elasticsearch's /_cluster/health endpoint. diff --git a/receiver/elasticsearchreceiver/internal/model/nodestats.go b/receiver/elasticsearchreceiver/internal/model/nodestats.go index c29c409fcca9..909ed9d75830 100644 --- a/receiver/elasticsearchreceiver/internal/model/nodestats.go +++ b/receiver/elasticsearchreceiver/internal/model/nodestats.go @@ -1,3 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package model // NodeStats represents a response from elasticsearch's /_nodes/stats endpoint. From eaf5326b96200356d22a99b1f5a4b736df03e7d7 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 4 Jan 2022 15:14:41 -0500 Subject: [PATCH 13/19] re-order imports in client.go --- receiver/elasticsearchreceiver/client.go | 4 ++-- receiver/elasticsearchreceiver/client_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go index ecd541ff411d..7eec87f3f338 100644 --- a/receiver/elasticsearchreceiver/client.go +++ b/receiver/elasticsearchreceiver/client.go @@ -25,9 +25,9 @@ import ( "net/url" "strings" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" - "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" ) var ( diff --git a/receiver/elasticsearchreceiver/client_test.go b/receiver/elasticsearchreceiver/client_test.go index b25db6a2fc43..8269c1324d20 100644 --- a/receiver/elasticsearchreceiver/client_test.go +++ b/receiver/elasticsearchreceiver/client_test.go @@ -24,10 +24,10 @@ import ( "strings" "testing" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" - "github.com/stretchr/testify/require" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" ) func TestNodeStatsNoPassword(t *testing.T) { From a389d21b63182486b0d8d275186f377f43955bdf Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 4 Jan 2022 15:38:22 -0500 Subject: [PATCH 14/19] make porto --- receiver/elasticsearchreceiver/client.go | 2 +- receiver/elasticsearchreceiver/internal/model/clusterhealth.go | 2 +- receiver/elasticsearchreceiver/internal/model/nodestats.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go index 7eec87f3f338..2e3c0ab1b9ae 100644 --- a/receiver/elasticsearchreceiver/client.go +++ b/receiver/elasticsearchreceiver/client.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package elasticsearchreceiver +package elasticsearchreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver" import ( "context" diff --git a/receiver/elasticsearchreceiver/internal/model/clusterhealth.go b/receiver/elasticsearchreceiver/internal/model/clusterhealth.go index 784ba6fa8e51..c5adc4b15bd0 100644 --- a/receiver/elasticsearchreceiver/internal/model/clusterhealth.go +++ b/receiver/elasticsearchreceiver/internal/model/clusterhealth.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package model +package model // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" // ClusterHealth represents a response from elasticsearch's /_cluster/health endpoint. // The struct is not exhaustive; It does not provide all values returned by elasticsearch, diff --git a/receiver/elasticsearchreceiver/internal/model/nodestats.go b/receiver/elasticsearchreceiver/internal/model/nodestats.go index 909ed9d75830..dc8458106625 100644 --- a/receiver/elasticsearchreceiver/internal/model/nodestats.go +++ b/receiver/elasticsearchreceiver/internal/model/nodestats.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package model +package model // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" // NodeStats represents a response from elasticsearch's /_nodes/stats endpoint. // The struct is not exhaustive; It does not provide all values returned by elasticsearch, From 633cff100014c9978639133afa6dc3401fede032 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 5 Jan 2022 04:50:28 -0500 Subject: [PATCH 15/19] add newline to sample health payload --- .../elasticsearchreceiver/testdata/sample_payloads/health.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/elasticsearchreceiver/testdata/sample_payloads/health.json b/receiver/elasticsearchreceiver/testdata/sample_payloads/health.json index ba76f60d02c1..969681ffaffd 100644 --- a/receiver/elasticsearchreceiver/testdata/sample_payloads/health.json +++ b/receiver/elasticsearchreceiver/testdata/sample_payloads/health.json @@ -14,4 +14,4 @@ "number_of_in_flight_fetch": 0, "task_max_waiting_in_queue_millis": 0, "active_shards_percent_as_number": 100.0 -} \ No newline at end of file +} From 2cd4a108ca9afb166b72977e1cbe9bbf88a13532 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 5 Jan 2022 04:55:13 -0500 Subject: [PATCH 16/19] fix disjoint client error handling --- receiver/elasticsearchreceiver/client.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go index 2e3c0ab1b9ae..15223d3ee76c 100644 --- a/receiver/elasticsearchreceiver/client.go +++ b/receiver/elasticsearchreceiver/client.go @@ -128,19 +128,20 @@ func (c defaultElasticsearchClient) doRequest(ctx context.Context, path string) } defer resp.Body.Close() - if resp.StatusCode != 200 { - body, err := io.ReadAll(resp.Body) - c.logger.Debug( - "Failed to make request to Elasticsearch", - zap.String("path", path), - zap.Int("status_code", resp.StatusCode), - zap.ByteString("body", body), - zap.NamedError("body_read_error", err), - ) + if resp.StatusCode == 200 { + return io.ReadAll(resp.Body) } + body, err := io.ReadAll(resp.Body) + c.logger.Debug( + "Failed to make request to Elasticsearch", + zap.String("path", path), + zap.Int("status_code", resp.StatusCode), + zap.ByteString("body", body), + zap.NamedError("body_read_error", err), + ) + switch resp.StatusCode { - case 200: // OK case 401: return nil, errUnauthenticated case 403: @@ -148,6 +149,4 @@ func (c defaultElasticsearchClient) doRequest(ctx context.Context, path string) default: return nil, fmt.Errorf("got non 200 status code %d", resp.StatusCode) } - - return io.ReadAll(resp.Body) } From d4de501eaaef0f3d9c406666deb35102258b78e6 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 5 Jan 2022 05:18:17 -0500 Subject: [PATCH 17/19] take in config struct for client creation --- receiver/elasticsearchreceiver/client.go | 19 +++- receiver/elasticsearchreceiver/client_test.go | 107 +++++++++++++----- 2 files changed, 94 insertions(+), 32 deletions(-) diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go index 15223d3ee76c..6a83e8539360 100644 --- a/receiver/elasticsearchreceiver/client.go +++ b/receiver/elasticsearchreceiver/client.go @@ -25,6 +25,7 @@ import ( "net/url" "strings" + "go.opentelemetry.io/collector/component" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" @@ -52,10 +53,20 @@ type defaultElasticsearchClient struct { var _ elasticsearchClient = (*defaultElasticsearchClient)(nil) -func newElasticsearchClient(logger *zap.Logger, client *http.Client, endpoint *url.URL, username, password string) *defaultElasticsearchClient { +func newElasticsearchClient(logger *zap.Logger, c Config, h component.Host) (*defaultElasticsearchClient, error) { + client, err := c.HTTPClientSettings.ToClient(h.GetExtensions()) + if err != nil { + return nil, err + } + + endpoint, err := url.Parse(c.Endpoint) + if err != nil { + return nil, err + } + var authHeader string - if username != "" && password != "" { - userPass := fmt.Sprintf("%s:%s", username, password) + if c.Username != "" && c.Password != "" { + userPass := fmt.Sprintf("%s:%s", c.Username, c.Password) authb64 := base64.StdEncoding.EncodeToString([]byte(userPass)) authHeader = fmt.Sprintf("Basic %s", authb64) } @@ -65,7 +76,7 @@ func newElasticsearchClient(logger *zap.Logger, client *http.Client, endpoint *u authHeader: authHeader, endpoint: endpoint, logger: logger, - } + }, nil } // nodeStatsMetrics is a comma separated list of metrics that will be gathered from NodeStats. diff --git a/receiver/elasticsearchreceiver/client_test.go b/receiver/elasticsearchreceiver/client_test.go index 8269c1324d20..420f9388efb6 100644 --- a/receiver/elasticsearchreceiver/client_test.go +++ b/receiver/elasticsearchreceiver/client_test.go @@ -20,16 +20,26 @@ import ( "io/ioutil" "net/http" "net/http/httptest" - "net/url" "strings" "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" ) +func TestCreateClientInvalidEndpoint(t *testing.T) { + _, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: "http://\x00", + }, + }, componenttest.NewNopHost()) + require.Error(t, err) +} + func TestNodeStatsNoPassword(t *testing.T) { nodeJSON, err := ioutil.ReadFile("./testdata/sample_payloads/nodes_linux.json") require.NoError(t, err) @@ -40,10 +50,12 @@ func TestNodeStatsNoPassword(t *testing.T) { elasticsearchMock := mockServer(t, "", "") defer elasticsearchMock.Close() - url, err := url.Parse(elasticsearchMock.URL) + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + }, componenttest.NewNopHost()) require.NoError(t, err) - - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") ctx := context.Background() nodeStats, err := client.NodeStats(ctx, []string{"_all"}) require.NoError(t, err) @@ -61,10 +73,13 @@ func TestNodeStatsNilNodes(t *testing.T) { elasticsearchMock := mockServer(t, "", "") defer elasticsearchMock.Close() - url, err := url.Parse(elasticsearchMock.URL) + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + }, componenttest.NewNopHost()) require.NoError(t, err) - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") ctx := context.Background() nodeStats, err := client.NodeStats(ctx, nil) require.NoError(t, err) @@ -85,10 +100,15 @@ func TestNodeStatsAuthentication(t *testing.T) { elasticsearchMock := mockServer(t, username, password) defer elasticsearchMock.Close() - url, err := url.Parse(elasticsearchMock.URL) + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + Username: username, + Password: password, + }, componenttest.NewNopHost()) require.NoError(t, err) - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, username, password) ctx := context.Background() nodeStats, err := client.NodeStats(ctx, []string{"_all"}) require.NoError(t, err) @@ -100,10 +120,13 @@ func TestNodeStatsNoAuthentication(t *testing.T) { elasticsearchMock := mockServer(t, "user", "pass") defer elasticsearchMock.Close() - url, err := url.Parse(elasticsearchMock.URL) + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + }, componenttest.NewNopHost()) require.NoError(t, err) - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") ctx := context.Background() _, err = client.NodeStats(ctx, []string{"_all"}) require.ErrorIs(t, err, errUnauthenticated) @@ -113,10 +136,15 @@ func TestNodeStatsBadAuthentication(t *testing.T) { elasticsearchMock := mockServer(t, "user", "pass") defer elasticsearchMock.Close() - url, err := url.Parse(elasticsearchMock.URL) + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + Username: "bad_user", + Password: "bad_pass", + }, componenttest.NewNopHost()) require.NoError(t, err) - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "unauthorized_user", "password") ctx := context.Background() _, err = client.NodeStats(ctx, []string{"_all"}) require.ErrorIs(t, err, errUnauthorized) @@ -132,10 +160,13 @@ func TestClusterHealthNoPassword(t *testing.T) { elasticsearchMock := mockServer(t, "", "") defer elasticsearchMock.Close() - url, err := url.Parse(elasticsearchMock.URL) + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + }, componenttest.NewNopHost()) require.NoError(t, err) - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") ctx := context.Background() nodeStats, err := client.ClusterHealth(ctx) require.NoError(t, err) @@ -156,10 +187,15 @@ func TestClusterHealthAuthentication(t *testing.T) { elasticsearchMock := mockServer(t, username, password) defer elasticsearchMock.Close() - url, err := url.Parse(elasticsearchMock.URL) + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + Username: username, + Password: password, + }, componenttest.NewNopHost()) require.NoError(t, err) - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, username, password) ctx := context.Background() nodeStats, err := client.ClusterHealth(ctx) require.NoError(t, err) @@ -171,10 +207,13 @@ func TestClusterHealthNoAuthentication(t *testing.T) { elasticsearchMock := mockServer(t, "user", "pass") defer elasticsearchMock.Close() - url, err := url.Parse(elasticsearchMock.URL) + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + }, componenttest.NewNopHost()) require.NoError(t, err) - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") ctx := context.Background() _, err = client.ClusterHealth(ctx) require.ErrorIs(t, err, errUnauthenticated) @@ -184,30 +223,40 @@ func TestClusterHealthNoAuthorization(t *testing.T) { elasticsearchMock := mockServer(t, "user", "pass") defer elasticsearchMock.Close() - url, err := url.Parse(elasticsearchMock.URL) + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + Username: "bad_user", + Password: "bad_pass", + }, componenttest.NewNopHost()) require.NoError(t, err) - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "unauthorized_user", "password") ctx := context.Background() _, err = client.ClusterHealth(ctx) require.ErrorIs(t, err, errUnauthorized) } func TestDoRequestBadPath(t *testing.T) { - url, err := url.Parse("http://example.localhost:9200") + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: "http://example.localhost:9200", + }, + }, componenttest.NewNopHost()) require.NoError(t, err) - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "bad_username", "bad_password") _, err = client.doRequest(context.Background(), "\x7f") require.Error(t, err) } func TestDoRequestClientTimeout(t *testing.T) { - url, err := url.Parse("http://example.localhost:9200") + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: "http://example.localhost:9200", + }, + }, componenttest.NewNopHost()) require.NoError(t, err) - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "bad_username", "bad_password") - ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -219,11 +268,13 @@ func TestDoRequest404(t *testing.T) { elasticsearchMock := mockServer(t, "", "") defer elasticsearchMock.Close() - url, err := url.Parse(elasticsearchMock.URL) + client, err := newElasticsearchClient(zap.NewNop(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + }, componenttest.NewNopHost()) require.NoError(t, err) - client := newElasticsearchClient(zap.NewNop(), http.DefaultClient, url, "", "") - _, err = client.doRequest(context.Background(), "invalid_path") require.Error(t, err) require.Contains(t, err.Error(), "404") From e15c6d6863d37da04edfa2e66d0af7b2589b55ed Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 5 Jan 2022 05:23:38 -0500 Subject: [PATCH 18/19] omitempty on IOStats --- receiver/elasticsearchreceiver/internal/model/nodestats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/elasticsearchreceiver/internal/model/nodestats.go b/receiver/elasticsearchreceiver/internal/model/nodestats.go index dc8458106625..a4533941377e 100644 --- a/receiver/elasticsearchreceiver/internal/model/nodestats.go +++ b/receiver/elasticsearchreceiver/internal/model/nodestats.go @@ -146,7 +146,7 @@ type HTTPStats struct { type FSStats struct { Total FSTotalStats `json:"total"` - IOStats *IOStats `json:"io_stats"` + IOStats *IOStats `json:"io_stats,omitempty"` } type FSTotalStats struct { From d22803189970a67d5a6d68d64d262fb7bad353df Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 5 Jan 2022 05:41:26 -0500 Subject: [PATCH 19/19] add header to request 7.x compatible response --- receiver/elasticsearchreceiver/client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go index 6a83e8539360..d99809ef89a5 100644 --- a/receiver/elasticsearchreceiver/client.go +++ b/receiver/elasticsearchreceiver/client.go @@ -133,6 +133,10 @@ func (c defaultElasticsearchClient) doRequest(ctx context.Context, path string) req.Header.Add("Authorization", c.authHeader) } + // See https://www.elastic.co/guide/en/elasticsearch/reference/8.0/api-conventions.html#api-compatibility + // the compatible-with=7 should signal to newer version of Elasticsearch to use the v7.x API format + req.Header.Add("Accept", "application/vnd.elasticsearch+json; compatible-with=7") + resp, err := c.client.Do(req) if err != nil { return nil, err