From a3a43337d32a0687f4134d47a63fc1ee814f9ed5 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Thu, 30 Aug 2018 18:10:50 +0200 Subject: [PATCH] Improve monitoring reporter (#8090) (#8143) Add backoff and failover support to the Elasticsearch monitoring reporter. The monitoring reporter runs in 2 phases. First phase it checks for monitoring being enabled in Elasticsearch. The check runs every 30s. If multiple hosts are configured, one host is selected by random. Once phase 1 succeeds, phase 2 (collection phase) is started. Before this change, phase 2 was configured to use load-balancing without timeout if multiple hosts are configured. With events being dropped on error and only one document being generated every 10s, this was ok in most cases. Still, if one output is blocked, waiting for a long timeout failover to another host can happen, even if no error occured yet. If the failover host has errors, it might end up in a tight reconnect-loop without any backoff behavior. With recent changes to 6.4 beats creates a many more documents, which was not taken into account in original design. Due to this misbehaving monitoring outputs are much more likely: => Problems with reporter 1. Failover was not handled correctly 2. Creating more then one event and potentially spurious errors raise the need for backoff This changes configures the clients to failover mode only. Whenever the connection to one host fails, another host is selected by random. On failure the reporters output will backoff exponentially. If the second client (after failover) also fails, then the backoff waiting times are doubled. And so on. (cherry picked from commit 43ee7d78f2a09fc432865606dfcc7d2e3b14ee99) --- CHANGELOG.asciidoc | 1 + auditbeat/auditbeat.reference.yml | 11 ++++++ filebeat/filebeat.reference.yml | 11 ++++++ heartbeat/heartbeat.reference.yml | 11 ++++++ libbeat/_meta/config.reference.yml | 11 ++++++ .../monitoring/shared-monitor-config.asciidoc | 24 ++++++++++++- .../monitoring/report/elasticsearch/config.go | 10 ++++++ .../report/elasticsearch/elasticsearch.go | 35 ++++++++++++------- metricbeat/metricbeat.reference.yml | 11 ++++++ packetbeat/packetbeat.reference.yml | 11 ++++++ winlogbeat/winlogbeat.reference.yml | 11 ++++++ 11 files changed, 133 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index fe46ebe3dfe..5581aa19aac 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -38,6 +38,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] - Remove unix-like permission checks on Windows, so files can be opened. {issue}7849[7849] - Replace index patterns in TSVB visualizations. {pull}7929[7929] - Deregister pipeline loader callback when inputsRunner is stopped. {pull}[7893][7893] +- Add backoff support to x-pack monitoring outputs. {issue}7966[7966] *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 43f0017d73b..68d577ab22e 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -1121,6 +1121,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index d89359f34b1..f1265d5be43 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1781,6 +1781,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 70488a86b17..ca2a1134553 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -1228,6 +1228,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index 27d2e4205d3..342e70dbd98 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -1014,6 +1014,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/libbeat/docs/monitoring/shared-monitor-config.asciidoc b/libbeat/docs/monitoring/shared-monitor-config.asciidoc index 2990d8ef7e3..2ea94649b45 100644 --- a/libbeat/docs/monitoring/shared-monitor-config.asciidoc +++ b/libbeat/docs/monitoring/shared-monitor-config.asciidoc @@ -39,6 +39,21 @@ configuration option contains the following fields: The maximum number of metrics to bulk in a single {es} bulk API index request. The default is `50`. For more information, see <>. +[float] +==== `backoff.init` + +The number of seconds to wait before trying to reconnect to Elasticsearch after +a network error. After waiting `backoff.init` seconds, {beatname_uc} tries to +reconnect. If the attempt fails, the backoff timer is increased exponentially up +to `backoff.max`. After a successful connection, the backoff timer is reset. The +default is 1s. + +[float] +===== `backoff.max` + +The maximum number of seconds to wait before attempting to connect to +Elasticsearch after a network error. The default is 60s. + [float] ==== `compression_level` @@ -79,10 +94,17 @@ The password that {beatname_uc} uses to authenticate with the {es} instances for shipping monitoring data. [float] -==== `period` +==== `metrics.period` The time interval (in seconds) when metrics are sent to the {es} cluster. A new snapshot of {beatname_uc} metrics is generated and scheduled for publishing each +period. The default value is 10 * time.Second. + +[float] +==== `state.period` + +The time interval (in seconds) when state information are sent to the {es} cluster. A new +snapshot of {beatname_uc} state is generated and scheduled for publishing each period. The default value is 60 * time.Second. [float] diff --git a/libbeat/monitoring/report/elasticsearch/config.go b/libbeat/monitoring/report/elasticsearch/config.go index 2856e6d88b8..8f59cf79bad 100644 --- a/libbeat/monitoring/report/elasticsearch/config.go +++ b/libbeat/monitoring/report/elasticsearch/config.go @@ -42,6 +42,12 @@ type config struct { BulkMaxSize int `config:"bulk_max_size" validate:"min=0"` BufferSize int `config:"buffer_size"` Tags []string `config:"tags"` + Backoff backoff `config:"backoff"` +} + +type backoff struct { + Init time.Duration + Max time.Duration } var defaultConfig = config{ @@ -61,4 +67,8 @@ var defaultConfig = config{ BulkMaxSize: 50, BufferSize: 50, Tags: nil, + Backoff: backoff{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 7c9357abdd7..83115a9fe95 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -54,7 +54,8 @@ type reporter struct { // pipeline pipeline *pipeline.Pipeline client beat.Client - out outputs.Group + + out []outputs.NetworkClient } var debugf = logp.MakeDebug("monitoring") @@ -104,22 +105,21 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { params[k] = v } - out := outputs.Group{ - Clients: nil, - BatchSize: windowSize, - Retry: 0, // no retry. on error drop events - } - hosts, err := outputs.ReadHostList(cfg) if err != nil { return nil, err } + if len(hosts) == 0 { + return nil, errors.New("empty hosts list") + } + + var clients []outputs.NetworkClient for _, host := range hosts { client, err := makeClient(host, params, proxyURL, tlsConfig, &config) if err != nil { return nil, err } - out.Clients = append(out.Clients, client) + clients = append(clients, client) } queueFactory := func(e queue.Eventer) (queue.Queue, error) { @@ -131,10 +131,19 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { monitoring := monitoring.Default.GetRegistry("xpack.monitoring") + outClient := outputs.NewFailoverClient(clients) + outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max) + pipeline, err := pipeline.New( beat, monitoring, - queueFactory, out, pipeline.Settings{ + queueFactory, + outputs.Group{ + Clients: []outputs.Client{outClient}, + BatchSize: windowSize, + Retry: 0, // no retry. Drop event on error. + }, + pipeline.Settings{ WaitClose: 0, WaitCloseMode: pipeline.NoWaitOnClose, }) @@ -142,7 +151,7 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { return nil, err } - client, err := pipeline.Connect() + pipeConn, err := pipeline.Connect() if err != nil { pipeline.Close() return nil, err @@ -154,8 +163,8 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) { tags: config.Tags, checkRetry: checkRetry, pipeline: pipeline, - client: client, - out: out, + client: pipeConn, + out: clients, } go r.initLoop(config) return r, nil @@ -175,7 +184,7 @@ func (r *reporter) initLoop(c config) { for { // Select one configured endpoint by random and check if xpack is available - client := r.out.Clients[rand.Intn(len(r.out.Clients))].(outputs.NetworkClient) + client := r.out[rand.Intn(len(r.out))] err := client.Connect() if err == nil { closing(client) diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 51342480ebe..cf4df32f416 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -1688,6 +1688,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 6b6b0a1be61..2121bc8f969 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -1491,6 +1491,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90 diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 142075880f1..24409308316 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -1043,6 +1043,17 @@ logging.files: # The default is 50. #bulk_max_size: 50 + # The number of seconds to wait before trying to reconnect to Elasticsearch + # after a network error. After waiting backoff.init seconds, the Beat + # tries to reconnect. If the attempt fails, the backoff timer is increased + # exponentially up to backoff.max. After a successful connection, the backoff + # timer is reset. The default is 1s. + #backoff.init: 1s + + # The maximum number of seconds to wait before attempting to connect to + # Elasticsearch after a network error. The default is 60s. + #backoff.max: 60s + # Configure http request timeout before failing an request to Elasticsearch. #timeout: 90