diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e1a20eccf9c..5901d744935 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add dashboard for `redisenterprise` module. {pull}16752[16752] - Dynamically choose a method for the system/service metricset to support older linux distros. {pull}16902[16902] - Reduce memory usage in `elasticsearch/index` metricset. {issue}16503[16503] {pull}16538[16538] +- Check if CCR feature is available on Elasticsearch cluster before attempting to call CCR APIs from `elasticsearch/ccr` metricset. {issue}16511[16511] {pull}17073[17073] *Packetbeat* diff --git a/metricbeat/module/elasticsearch/ccr/_meta/docs.asciidoc b/metricbeat/module/elasticsearch/ccr/_meta/docs.asciidoc index e38b8702e5f..04642b5f101 100644 --- a/metricbeat/module/elasticsearch/ccr/_meta/docs.asciidoc +++ b/metricbeat/module/elasticsearch/ccr/_meta/docs.asciidoc @@ -1,3 +1,8 @@ -This is the `ccr` metricset of the Elasticsearch module. It interrogates the -Cross Cluster Replication Stats API endpoint to fetch information about shards -in the Elasticsearch cluster that are participating in cross-cluster replication. +This is the `ccr` metricset of the {es} module. It uses the +Cross-Cluster Replication Stats API endpoint to fetch metrics about cross-cluster +replication from the {es} clusters that are participating in cross-cluster +replication. + +If the {es} cluster does not have cross-cluster replication enabled, this metricset +will not collect metrics. A DEBUG log message about this will be emitted in the +Metricbeat log. diff --git a/metricbeat/module/elasticsearch/ccr/ccr.go b/metricbeat/module/elasticsearch/ccr/ccr.go index 20d5cc65779..591f3d12e22 100644 --- a/metricbeat/module/elasticsearch/ccr/ccr.go +++ b/metricbeat/module/elasticsearch/ccr/ccr.go @@ -121,6 +121,16 @@ func (m *MetricSet) checkCCRAvailability(currentElasticsearchVersion *common.Ver return } + xpack, err := elasticsearch.GetXPack(m.HTTP, m.GetServiceURI()) + if err != nil { + return "", errors.Wrap(err, "error determining xpack features") + } + + if !xpack.Features.CCR.Enabled { + message = "the CCR feature is not enabled on your Elasticsearch cluster." + return + } + isAvailable := elastic.IsFeatureAvailable(currentElasticsearchVersion, elasticsearch.CCRStatsAPIAvailableVersion) if !isAvailable { diff --git a/metricbeat/module/elasticsearch/ccr/ccr_test.go b/metricbeat/module/elasticsearch/ccr/ccr_test.go new file mode 100644 index 00000000000..f6d94c739e4 --- /dev/null +++ b/metricbeat/module/elasticsearch/ccr/ccr_test.go @@ -0,0 +1,116 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ccr + +import ( + "net/http" + "net/http/httptest" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/metricbeat/module/elasticsearch" + + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" +) + +func startESServer(esVersion, license string, ccrEnabled bool) *httptest.Server { + + nodesLocalHandler := func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"nodes": { "foobar": {}}}`)) + } + clusterStateMasterHandler := func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"master_node": "foobar"}`)) + } + rootHandler := func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + } + w.Write([]byte(`{"version": { "number": "` + esVersion + `" } }`)) + } + licenseHandler := func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{ "license": { "type": "` + license + `" } }`)) + } + xpackHandler := func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{ "features": { "ccr": { "enabled": ` + strconv.FormatBool(ccrEnabled) + `}}}`)) + } + ccrStatsHandler := func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "this should never have been called", 418) + } + + mux := http.NewServeMux() + mux.Handle("/_nodes/_local/nodes", http.HandlerFunc(nodesLocalHandler)) + mux.Handle("/_cluster/state/master_node", http.HandlerFunc(clusterStateMasterHandler)) + mux.Handle("/", http.HandlerFunc(rootHandler)) + mux.Handle("/_license", http.HandlerFunc(licenseHandler)) // for 7.0 and above + mux.Handle("/_xpack/license", http.HandlerFunc(licenseHandler)) // for before 7.0 + mux.Handle("/_xpack", http.HandlerFunc(xpackHandler)) + mux.Handle("/_ccr/stats", http.HandlerFunc(ccrStatsHandler)) + + return httptest.NewServer(mux) +} + +func TestCCRNotAvailable(t *testing.T) { + tests := map[string]struct { + esVersion string + license string + ccrEnabled bool + }{ + "old_version": { + "6.4.0", + "platinum", + true, + }, + "low_license": { + "7.6.0", + "basic", + true, + }, + "feature_unavailable": { + "7.6.0", + "platinum", + false, + }, + } + + // Disable license caching for these tests + elasticsearch.LicenseCacheEnabled = false + defer func() { elasticsearch.LicenseCacheEnabled = true }() + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + server := startESServer(test.esVersion, test.license, test.ccrEnabled) + defer server.Close() + + ms := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL)) + events, errs := mbtest.ReportingFetchV2Error(ms) + + require.Empty(t, errs) + require.Empty(t, events) + }) + } +} + +func getConfig(host string) map[string]interface{} { + return map[string]interface{}{ + "module": elasticsearch.ModuleName, + "metricsets": []string{"ccr"}, + "hosts": []string{host}, + } +} diff --git a/metricbeat/module/elasticsearch/elasticsearch.go b/metricbeat/module/elasticsearch/elasticsearch.go index 9698f6ce003..45e8ab17a23 100644 --- a/metricbeat/module/elasticsearch/elasticsearch.go +++ b/metricbeat/module/elasticsearch/elasticsearch.go @@ -363,6 +363,27 @@ func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) { return stackUsage, err } +type XPack struct { + Features struct { + CCR struct { + Enabled bool `json:"enabled"` + } `json:"CCR"` + } `json:"features"` +} + +// GetXPack returns information about xpack features. +func GetXPack(http *helper.HTTP, resetURI string) (XPack, error) { + content, err := fetchPath(http, resetURI, "_xpack", "") + + if err != nil { + return XPack{}, err + } + + var xpack XPack + err = json.Unmarshal(content, &xpack) + return xpack, err +} + // IsMLockAllEnabled returns if the given Elasticsearch node has mlockall enabled func IsMLockAllEnabled(http *helper.HTTP, resetURI, nodeID string) (bool, error) { content, err := fetchPath(http, resetURI, "_nodes/"+nodeID, "filter_path=nodes.*.process.mlockall") @@ -437,8 +458,13 @@ func MergeClusterSettings(clusterSettings common.MapStr) (common.MapStr, error) return settings, nil } -// Global cache for license information. Assumption is that license information changes infrequently. -var licenseCache = &_licenseCache{} +var ( + // Global cache for license information. Assumption is that license information changes infrequently. + licenseCache = &_licenseCache{} + + // LicenseCacheEnabled controls whether license caching is enabled or not. Intended for test use. + LicenseCacheEnabled = true +) type _licenseCache struct { sync.RWMutex @@ -460,6 +486,10 @@ func (c *_licenseCache) get() *License { } func (c *_licenseCache) set(license *License, ttl time.Duration) { + if !LicenseCacheEnabled { + return + } + c.Lock() defer c.Unlock()