diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 25b09e7bf86..331ebadff76 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -112,6 +112,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Report number of open file handles on Windows. {pull}8329[8329] - Support for Kafka 2.0.0 in kafka output {pull}8399[8399] - Add setting `setup.kibana.space.id` to support Kibana Spaces {pull}7942[7942] +- Add Beats Central Management {pull}8559[8559] *Auditbeat* diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 84e8c428804..9423b774dd1 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -19,6 +19,7 @@ package beat import ( "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/management" ) // Creator initializes and configures a new Beater instance used to execute @@ -64,6 +65,8 @@ type Beat struct { BeatConfig *common.Config // The beat's own configuration section Fields []byte // Data from fields.yml + + ConfigManager management.ConfigManager // config manager } // BeatConfig struct contains the basic configuration of every beat diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 548e2651411..9885ae05d63 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -35,6 +35,9 @@ import ( "github.com/satori/go.uuid" "go.uber.org/zap" + "github.com/elastic/go-sysinfo" + "github.com/elastic/go-sysinfo/types" + "github.com/elastic/beats/libbeat/api" "github.com/elastic/beats/libbeat/asset" "github.com/elastic/beats/libbeat/beat" @@ -43,11 +46,13 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/file" + "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/common/seccomp" "github.com/elastic/beats/libbeat/dashboards" "github.com/elastic/beats/libbeat/keystore" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/logp/configure" + "github.com/elastic/beats/libbeat/management" "github.com/elastic/beats/libbeat/metric/system/host" "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/monitoring/report" @@ -59,8 +64,6 @@ import ( svc "github.com/elastic/beats/libbeat/service" "github.com/elastic/beats/libbeat/template" "github.com/elastic/beats/libbeat/version" - "github.com/elastic/go-sysinfo" - "github.com/elastic/go-sysinfo/types" // Register publisher pipeline modules _ "github.com/elastic/beats/libbeat/publisher/includes" @@ -313,6 +316,8 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { return nil, fmt.Errorf("error initializing publisher: %+v", err) } + reload.Register.MustRegister("output", pipeline.OutputReloader()) + // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, // but refine publisher to disconnect clients on stop automatically // defer pipeline.Close() @@ -393,6 +398,10 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { api.Start(b.Config.HTTP) } + // Launch config manager + b.ConfigManager.Start() + defer b.ConfigManager.Stop() + return beater.Run(&b.Beat) } @@ -573,6 +582,16 @@ func (b *Beat) configure(settings Settings) error { logp.Info("Beat UUID: %v", b.Info.UUID) + // initialize config manager + b.ConfigManager, err = management.Factory()(reload.Register, b.Beat.Info.UUID) + if err != nil { + return err + } + + if err := b.ConfigManager.CheckRawConfig(b.RawConfig); err != nil { + return err + } + if maxProcs := b.Config.MaxProcs; maxProcs > 0 { runtime.GOMAXPROCS(maxProcs) } diff --git a/libbeat/common/cli/password.go b/libbeat/common/cli/password.go new file mode 100644 index 00000000000..f7b51efe1ea --- /dev/null +++ b/libbeat/common/cli/password.go @@ -0,0 +1,78 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cli + +import ( + "fmt" + "os" + "strings" + "syscall" + + "github.com/pkg/errors" + "golang.org/x/crypto/ssh/terminal" +) + +type method func(m string) (string, error) + +var methods = map[string]method{ + "stdin": stdin, + "env": env, +} + +// ReadPassword allows to read a password passed as a command line parameter. +// It offers several ways to read the password so it is not directly passed as a plain text argument: +// stdin - Will prompt the user to input the password +// env:VAR_NAME - Will read the password from the given env variable +// +func ReadPassword(def string) (string, error) { + if len(def) == 0 { + return "", errors.New("empty password definition") + } + + var method, params string + parts := strings.SplitN(def, ":", 2) + method = strings.ToLower(parts[0]) + + if len(parts) == 2 { + params = parts[1] + } + + m := methods[method] + if m == nil { + return "", errors.New("unknown password source, use stdin or env:VAR_NAME") + } + + return m(params) +} + +func stdin(p string) (string, error) { + fmt.Print("Enter password: ") + bytePassword, err := terminal.ReadPassword(int(syscall.Stdin)) + if err != nil { + return "", errors.Wrap(err, "reading password input") + } + return string(bytePassword), nil +} + +func env(p string) (string, error) { + if len(p) == 0 { + return "", errors.New("env variable name is needed when using env: password method") + } + + return os.Getenv(p), nil +} diff --git a/libbeat/common/cli/password_test.go b/libbeat/common/cli/password_test.go new file mode 100644 index 00000000000..d9716cd310b --- /dev/null +++ b/libbeat/common/cli/password_test.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cli + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReadPassword(t *testing.T) { + os.Setenv("FOO", "random") + + tests := []struct { + name string + input string + password string + error bool + }{ + { + name: "Test env variable", + input: "env:FOO", + password: "random", + }, + { + name: "Test unknown method", + input: "foo:bar", + error: true, + }, + { + name: "Test empty input", + input: "", + error: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + password, err := ReadPassword(test.input) + assert.Equal(t, test.password, password) + + if test.error { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/libbeat/common/config.go b/libbeat/common/config.go index b1d5aba7ec5..2ca6ad859b5 100644 --- a/libbeat/common/config.go +++ b/libbeat/common/config.go @@ -28,11 +28,11 @@ import ( "strings" ucfg "github.com/elastic/go-ucfg" + "github.com/elastic/go-ucfg/cfgutil" "github.com/elastic/go-ucfg/yaml" "github.com/elastic/beats/libbeat/common/file" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/go-ucfg/cfgutil" ) var flagStrictPerms = flag.Bool("strict.perms", true, "Strict permission checking on config files") diff --git a/libbeat/common/reload/reload.go b/libbeat/common/reload/reload.go index b3f61c26727..1f8b3ab4a73 100644 --- a/libbeat/common/reload/reload.go +++ b/libbeat/common/reload/reload.go @@ -26,7 +26,7 @@ import ( ) // Register holds a registry of reloadable objects -var Register = newRegistry() +var Register = NewRegistry() // ConfigWithMeta holds a pair of common.Config and optional metadata for it type ConfigWithMeta struct { @@ -47,21 +47,23 @@ type Reloadable interface { Reload(config *ConfigWithMeta) error } -type registry struct { +// Registry of reloadable objects and lists +type Registry struct { sync.RWMutex confsLists map[string]ReloadableList confs map[string]Reloadable } -func newRegistry() *registry { - return ®istry{ +// NewRegistry initializes and returns a reload registry +func NewRegistry() *Registry { + return &Registry{ confsLists: make(map[string]ReloadableList), confs: make(map[string]Reloadable), } } // Register declares a reloadable object -func (r *registry) Register(name string, obj Reloadable) error { +func (r *Registry) Register(name string, obj Reloadable) error { r.Lock() defer r.Unlock() @@ -78,7 +80,7 @@ func (r *registry) Register(name string, obj Reloadable) error { } // RegisterList declares a reloadable list of configurations -func (r *registry) RegisterList(name string, list ReloadableList) error { +func (r *Registry) RegisterList(name string, list ReloadableList) error { r.Lock() defer r.Unlock() @@ -95,34 +97,34 @@ func (r *registry) RegisterList(name string, list ReloadableList) error { } // MustRegister declares a reloadable object -func (r *registry) MustRegister(name string, obj Reloadable) { +func (r *Registry) MustRegister(name string, obj Reloadable) { if err := r.Register(name, obj); err != nil { panic(err) } } // MustRegisterList declares a reloadable object list -func (r *registry) MustRegisterList(name string, list ReloadableList) { +func (r *Registry) MustRegisterList(name string, list ReloadableList) { if err := r.RegisterList(name, list); err != nil { panic(err) } } -// Get returns the reloadable object with the given name, nil if not found -func (r *registry) Get(name string) Reloadable { +// GetReloadable returns the reloadable object with the given name, nil if not found +func (r *Registry) GetReloadable(name string) Reloadable { r.RLock() defer r.RUnlock() return r.confs[name] } -// GetList returns the reloadable list with the given name, nil if not found -func (r *registry) GetList(name string) ReloadableList { +// GetReloadableList returns the reloadable list with the given name, nil if not found +func (r *Registry) GetReloadableList(name string) ReloadableList { r.RLock() defer r.RUnlock() return r.confsLists[name] } -func (r *registry) nameTaken(name string) bool { +func (r *Registry) nameTaken(name string) bool { if _, ok := r.confs[name]; ok { return true } diff --git a/libbeat/common/reload/reload_test.go b/libbeat/common/reload/reload_test.go index e2238895375..04c478f7476 100644 --- a/libbeat/common/reload/reload_test.go +++ b/libbeat/common/reload/reload_test.go @@ -29,26 +29,26 @@ type reloadableList struct{} func (reloadable) Reload(config *ConfigWithMeta) error { return nil } func (reloadableList) Reload(config []*ConfigWithMeta) error { return nil } -func RegisterReloadable(t *testing.T) { +func TestRegisterReloadable(t *testing.T) { obj := reloadable{} - r := newRegistry() + r := NewRegistry() r.Register("my.reloadable", obj) - assert.Equal(t, obj, r.Get("my.reloadable")) + assert.Equal(t, obj, r.GetReloadable("my.reloadable")) } -func RegisterReloadableList(t *testing.T) { +func TestRegisterReloadableList(t *testing.T) { objl := reloadableList{} - r := newRegistry() + r := NewRegistry() r.RegisterList("my.reloadable", objl) - assert.Equal(t, objl, r.Get("my.reloadable")) + assert.Equal(t, objl, r.GetReloadableList("my.reloadable")) } func TestRegisterNilFails(t *testing.T) { - r := newRegistry() + r := NewRegistry() err := r.Register("name", nil) assert.Error(t, err) @@ -58,7 +58,7 @@ func TestRegisterNilFails(t *testing.T) { } func TestReRegisterFails(t *testing.T) { - r := newRegistry() + r := NewRegistry() // two obj with the same name err := r.Register("name", reloadable{}) diff --git a/libbeat/kibana/client.go b/libbeat/kibana/client.go index 2efa9b0bfa7..75323a3b655 100644 --- a/libbeat/kibana/client.go +++ b/libbeat/kibana/client.go @@ -145,8 +145,10 @@ func NewClientWithConfig(config *ClientConfig) (*Client, error) { }, } - if err = client.SetVersion(); err != nil { - return nil, fmt.Errorf("fail to get the Kibana version: %v", err) + if !config.IgnoreVersion { + if err = client.SetVersion(); err != nil { + return nil, fmt.Errorf("fail to get the Kibana version: %v", err) + } } return client, nil diff --git a/libbeat/kibana/client_config.go b/libbeat/kibana/client_config.go index c5dc101075c..277fc3fdf6a 100644 --- a/libbeat/kibana/client_config.go +++ b/libbeat/kibana/client_config.go @@ -25,14 +25,15 @@ import ( // ClientConfig to connect to Kibana type ClientConfig struct { - Protocol string `config:"protocol"` - Host string `config:"host"` - Path string `config:"path"` - SpaceID string `config:"space.id"` - Username string `config:"username"` - Password string `config:"password"` - TLS *tlscommon.Config `config:"ssl"` - Timeout time.Duration `config:"timeout"` + Protocol string `config:"protocol"` + Host string `config:"host"` + Path string `config:"path"` + SpaceID string `config:"space.id"` + Username string `config:"username"` + Password string `config:"password"` + TLS *tlscommon.Config `config:"ssl"` + Timeout time.Duration `config:"timeout"` + IgnoreVersion bool } var ( diff --git a/libbeat/management/management.go b/libbeat/management/management.go new file mode 100644 index 00000000000..4a574f79f34 --- /dev/null +++ b/libbeat/management/management.go @@ -0,0 +1,84 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package management + +import ( + "github.com/satori/go.uuid" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/reload" + "github.com/elastic/beats/libbeat/feature" +) + +// Namespace is the feature namespace for queue definition. +var Namespace = "libbeat.management" + +// DebugK used as key for all things central management +var DebugK = "centralmgmt" + +// ConfigManager interacts with the beat to update configurations +// from an external source +type ConfigManager interface { + // Enabled returns true if config manager is enabled + Enabled() bool + + // Start the config manager + Start() + + // Stop the config manager + Stop() + + // CheckRawConfig check settings are correct before launching the beat + CheckRawConfig(cfg *common.Config) error +} + +// FactoryFunc for creating a config manager +type FactoryFunc func(*reload.Registry, uuid.UUID) (ConfigManager, error) + +// Register a config manager +func Register(name string, fn FactoryFunc, stability feature.Stability) { + f := feature.New(Namespace, name, fn, feature.NewDetails(name, "", stability)) + feature.MustRegister(f) +} + +// Factory retrieves config manager constructor. If no one is registered +// it will create a nil manager +func Factory() FactoryFunc { + factories, err := feature.Registry.LookupAll(Namespace) + if err != nil { + return nilFactory + } + + for _, f := range factories { + if factory, ok := f.Factory().(FactoryFunc); ok { + return factory + } + } + + return nilFactory +} + +// nilManager, fallback when no manager is present +type nilManager struct{} + +func nilFactory(*reload.Registry, uuid.UUID) (ConfigManager, error) { return nilManager{}, nil } + +func (nilManager) Enabled() bool { return false } +func (nilManager) Start() {} +func (nilManager) Stop() {} +func (nilManager) CheckRawConfig(cfg *common.Config) error { return nil } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index c25e67e5eed..905eae91393 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -172,6 +172,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) pipeline, err := pipeline.New( beat, + pipeline.Monitors{}, monitoring, queueFactory, outputs.Group{ diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 303792adb9b..908f5619fdd 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -18,6 +18,9 @@ package pipeline import ( + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/publisher/queue" @@ -28,6 +31,8 @@ import ( // - stop // - reload type outputController struct { + beat beat.Info + monitors Monitors logger *logp.Logger observer outputObserver @@ -56,11 +61,15 @@ type outputWorker interface { } func newOutputController( + beat beat.Info, + monitors Monitors, log *logp.Logger, observer outputObserver, b queue.Queue, ) *outputController { c := &outputController{ + beat: beat, + monitors: monitors, logger: log, observer: observer, queue: b, @@ -128,6 +137,8 @@ func (c *outputController) Set(outGrp outputs.Group) { } } + c.out = grp + // restart consumer (potentially blocked by retryer) c.consumer.sigContinue() @@ -137,3 +148,21 @@ func (c *outputController) Set(outGrp outputs.Group) { func makeWorkQueue() workQueue { return workQueue(make(chan *Batch, 0)) } + +// Reload the output +func (c *outputController) Reload(cfg *reload.ConfigWithMeta) error { + outputCfg := common.ConfigNamespace{} + + if err := cfg.Config.Unpack(&outputCfg); err != nil { + return err + } + + output, err := loadOutput(c.beat, c.monitors, outputCfg) + if err != nil { + return err + } + + c.Set(output) + + return nil +} diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index 1920b159022..ec5c2a8fe09 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -103,7 +103,7 @@ func Load( return nil, err } - p, err := New(beatInfo, monitors.Metrics, queueBuilder, out, settings) + p, err := New(beatInfo, monitors, monitors.Metrics, queueBuilder, out, settings) if err != nil { return nil, err } @@ -132,14 +132,17 @@ func loadOutput( return outputs.Fail(errors.New(msg)) } - // TODO: add support to unload/reassign outStats on output reloading - var ( metrics *monitoring.Registry outStats outputs.Observer ) if monitors.Metrics != nil { - metrics = monitors.Metrics.NewRegistry("output") + metrics = monitors.Metrics.GetRegistry("output") + if metrics != nil { + metrics.Clear() + } else { + metrics = monitors.Metrics.NewRegistry("output") + } outStats = outputs.NewStats(metrics) } @@ -152,7 +155,12 @@ func loadOutput( monitoring.NewString(metrics, "type").Set(outcfg.Name()) } if monitors.Telemetry != nil { - telemetry := monitors.Telemetry.NewRegistry("output") + telemetry := monitors.Telemetry.GetRegistry("output") + if telemetry != nil { + telemetry.Clear() + } else { + telemetry = monitors.Telemetry.NewRegistry("output") + } monitoring.NewString(telemetry, "name").Set(outcfg.Name()) } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index ef727ebeb73..71f9ca16d2e 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -25,6 +25,8 @@ import ( "sync" "time" + "github.com/elastic/beats/libbeat/common/reload" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/atomic" @@ -151,6 +153,7 @@ type queueFactory func(queue.Eventer) (queue.Queue, error) // queue and outputs will be closed. func New( beat beat.Info, + monitors Monitors, metrics *monitoring.Registry, queueFactory queueFactory, out outputs.Group, @@ -203,7 +206,7 @@ func New( } p.eventSema = newSema(maxEvents) - p.output = newOutputController(log, p.observer, p.queue) + p.output = newOutputController(beat, monitors, log, p.observer, p.queue) p.output.Set(out) return p, nil @@ -434,3 +437,8 @@ func makePipelineProcessors( return p } + +// OutputReloader returns a reloadable object for the output section of this pipeline +func (p *Pipeline) OutputReloader() reload.Reloadable { + return p.output +} diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index fd5ea64031f..7223fbf757b 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -20,6 +20,9 @@ package beater import ( "sync" + "github.com/elastic/beats/libbeat/common/reload" + "github.com/elastic/beats/libbeat/management" + "github.com/joeshaw/multierror" "github.com/pkg/errors" @@ -187,6 +190,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe func (bt *Metricbeat) Run(b *beat.Beat) error { var wg sync.WaitGroup + // Static modules (metricbeat.modules) for _, m := range bt.modules { client, err := m.connector.Connect() if err != nil { @@ -203,9 +207,20 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { }() } + // Centrally managed modules + factory := module.NewFactory(bt.moduleOptions...) + modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher) + reload.Register.MustRegisterList(b.Info.Beat+".modules", modules) + wg.Add(1) + go func() { + defer wg.Done() + <-bt.done + modules.Stop() + }() + + // Dynamic file based modules (metricbeat.config.modules) if bt.config.ConfigModules.Enabled() { moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules) - factory := module.NewFactory(bt.moduleOptions...) if err := moduleReloader.Check(factory); err != nil { return err @@ -220,6 +235,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { }() } + // Autodiscover (metricbeat.autodiscover) if bt.autodiscover != nil { bt.autodiscover.Start() wg.Add(1) diff --git a/x-pack/auditbeat/cmd/root.go b/x-pack/auditbeat/cmd/root.go index 24a64781a3d..682c61362b4 100644 --- a/x-pack/auditbeat/cmd/root.go +++ b/x-pack/auditbeat/cmd/root.go @@ -4,11 +4,14 @@ package cmd -import "github.com/elastic/beats/auditbeat/cmd" +import ( + "github.com/elastic/beats/auditbeat/cmd" + xpackcmd "github.com/elastic/beats/x-pack/libbeat/cmd" +) // RootCmd to handle beats cli var RootCmd = cmd.RootCmd func init() { - // TODO inject x-pack features + xpackcmd.AddXPack(RootCmd, cmd.Name) } diff --git a/x-pack/filebeat/cmd/root.go b/x-pack/filebeat/cmd/root.go index dd76fced042..781b473bb5a 100644 --- a/x-pack/filebeat/cmd/root.go +++ b/x-pack/filebeat/cmd/root.go @@ -4,11 +4,14 @@ package cmd -import "github.com/elastic/beats/filebeat/cmd" +import ( + "github.com/elastic/beats/filebeat/cmd" + xpackcmd "github.com/elastic/beats/x-pack/libbeat/cmd" +) // RootCmd to handle beats cli var RootCmd = cmd.RootCmd func init() { - // TODO inject x-pack features + xpackcmd.AddXPack(RootCmd, cmd.Name) } diff --git a/x-pack/heartbeat/cmd/root.go b/x-pack/heartbeat/cmd/root.go index 61146591b51..154d2cf7dcf 100644 --- a/x-pack/heartbeat/cmd/root.go +++ b/x-pack/heartbeat/cmd/root.go @@ -4,11 +4,14 @@ package cmd -import "github.com/elastic/beats/heartbeat/cmd" +import ( + "github.com/elastic/beats/heartbeat/cmd" + xpackcmd "github.com/elastic/beats/x-pack/libbeat/cmd" +) // RootCmd to handle beats cli var RootCmd = cmd.RootCmd func init() { - // TODO inject x-pack features + xpackcmd.AddXPack(RootCmd, cmd.Name) } diff --git a/x-pack/libbeat/cmd/enroll.go b/x-pack/libbeat/cmd/enroll.go new file mode 100644 index 00000000000..a0824f973d7 --- /dev/null +++ b/x-pack/libbeat/cmd/enroll.go @@ -0,0 +1,95 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + "fmt" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + + "github.com/elastic/beats/libbeat/cmd/instance" + "github.com/elastic/beats/libbeat/common/cli" + "github.com/elastic/beats/x-pack/libbeat/management" + "github.com/elastic/beats/x-pack/libbeat/management/api" +) + +func getBeat(name, version string) (*instance.Beat, error) { + b, err := instance.NewBeat(name, "", version) + + if err != nil { + return nil, fmt.Errorf("error creating beat: %s", err) + } + + if err = b.Init(); err != nil { + return nil, fmt.Errorf("error initializing beat: %s", err) + } + + return b, nil +} + +func genEnrollCmd(name, version string) *cobra.Command { + var username, password string + + enrollCmd := cobra.Command{ + Use: "enroll []", + Short: "Enroll in Kibana for Central Management", + Long: `This will enroll in Kibana Beats Central Management. If you pass an enrollment token + it will be used. You can also enroll using a username and password combination.`, + Args: cobra.RangeArgs(1, 2), + Run: cli.RunWith(func(cmd *cobra.Command, args []string) error { + beat, err := getBeat(name, version) + kibanaURL := args[0] + + if username == "" && len(args) == 1 { + return errors.New("You should pass either an enrollment token or use --username flag") + } + + var enrollmentToken string + if len(args) == 2 { + // use given enrollment token + enrollmentToken = args[1] + if err != nil { + return err + } + } else { + // retrieve an enrollment token using username/password + config, err := api.ConfigFromURL(kibanaURL) + if err != nil { + return err + } + + // pass username/password + config.IgnoreVersion = true + config.Username = username + config.Password, err = cli.ReadPassword(password) + if err != nil { + return err + } + + client, err := api.NewClient(config) + if err != nil { + return err + } + enrollmentToken, err = client.CreateEnrollmentToken() + if err != nil { + return errors.Wrap(err, "Creating a new enrollment token") + } + } + + if err = management.Enroll(beat, kibanaURL, enrollmentToken); err != nil { + return errors.Wrap(err, "Error while enrolling") + } + + fmt.Println("Enrolled and ready to retrieve settings from Kibana") + return nil + }), + } + + enrollCmd.Flags().StringVar(&username, "username", "elastic", "Username to use when enrolling without token") + enrollCmd.Flags().StringVar(&password, "password", "stdin", "Method to read the password to use when enrolling without token (stdin or env:VAR_NAME)") + + return &enrollCmd +} diff --git a/x-pack/libbeat/cmd/inject.go b/x-pack/libbeat/cmd/inject.go new file mode 100644 index 00000000000..715f3d2df5c --- /dev/null +++ b/x-pack/libbeat/cmd/inject.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + "github.com/elastic/beats/libbeat/cmd" + + // register central management + _ "github.com/elastic/beats/x-pack/libbeat/management" +) + +// AddXPack extends the given root folder with XPack features +func AddXPack(root *cmd.BeatsRootCmd, name string) { + root.AddCommand(genEnrollCmd(name, "")) +} diff --git a/x-pack/libbeat/management/api/client.go b/x-pack/libbeat/management/api/client.go new file mode 100644 index 00000000000..1516c13ce11 --- /dev/null +++ b/x-pack/libbeat/management/api/client.go @@ -0,0 +1,94 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package api + +import ( + "bytes" + "encoding/json" + "net/http" + "net/url" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/kibana" +) + +const defaultTimeout = 10 * time.Second + +// Client to Central Management API +type Client struct { + client *kibana.Client +} + +// ConfigFromURL generates a full kibana client config from an URL +func ConfigFromURL(kibanaURL string) (*kibana.ClientConfig, error) { + data, err := url.Parse(kibanaURL) + if err != nil { + return nil, err + } + + var username, password string + if data.User != nil { + username = data.User.Username() + password, _ = data.User.Password() + } + + return &kibana.ClientConfig{ + Protocol: data.Scheme, + Host: data.Host, + Path: data.Path, + Username: username, + Password: password, + Timeout: defaultTimeout, + }, nil +} + +// NewClient creates and returns a kibana client +func NewClient(cfg *kibana.ClientConfig) (*Client, error) { + client, err := kibana.NewClientWithConfig(cfg) + if err != nil { + return nil, err + } + return &Client{ + client: client, + }, nil +} + +// do a request to the API and unmarshall the message, error if anything fails +func (c *Client) request(method, extraPath string, + params common.MapStr, headers http.Header, message interface{}) (int, error) { + + paramsJSON, err := json.Marshal(params) + if err != nil { + return 400, err + } + + statusCode, result, err := c.client.Request(method, extraPath, nil, headers, bytes.NewBuffer(paramsJSON)) + if err != nil { + return statusCode, err + } + + if statusCode >= 300 { + err = extractError(result) + } else { + if err = json.Unmarshal(result, message); err != nil { + return statusCode, errors.Wrap(err, "error unmarshaling Kibana response") + } + } + + return statusCode, err +} + +func extractError(result []byte) error { + var kibanaResult struct { + Message string + } + if err := json.Unmarshal(result, &kibanaResult); err != nil { + return errors.Wrap(err, "parsing Kibana response") + } + return errors.New(kibanaResult.Message) +} diff --git a/x-pack/libbeat/management/api/client_test.go b/x-pack/libbeat/management/api/client_test.go new file mode 100644 index 00000000000..d7acdba0122 --- /dev/null +++ b/x-pack/libbeat/management/api/client_test.go @@ -0,0 +1,35 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package api + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Server, *Client) { + mux := http.NewServeMux() + mux.Handle("/api/status", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Unauthorized", 401) + })) + mux.Handle("/", handler) + + server := httptest.NewServer(mux) + + config, err := ConfigFromURL(server.URL) + if err != nil { + t.Fatal(err) + } + + config.IgnoreVersion = true + + client, err := NewClient(config) + if err != nil { + t.Fatal(err) + } + + return server, client +} diff --git a/x-pack/libbeat/management/api/configuration.go b/x-pack/libbeat/management/api/configuration.go new file mode 100644 index 00000000000..ce37bee2692 --- /dev/null +++ b/x-pack/libbeat/management/api/configuration.go @@ -0,0 +1,88 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package api + +import ( + "net/http" + "reflect" + + "github.com/elastic/beats/libbeat/common/reload" + + uuid "github.com/satori/go.uuid" + + "github.com/elastic/beats/libbeat/common" +) + +// ConfigBlock stores a piece of config from central management +type ConfigBlock struct { + Raw map[string]interface{} +} + +// ConfigBlocksWithType is a list of config blocks with the same type +type ConfigBlocksWithType struct { + Type string + Blocks []*ConfigBlock +} + +// ConfigBlocks holds a list of type + configs objects +type ConfigBlocks []ConfigBlocksWithType + +// Config returns a common.Config object holding the config from this block +func (c *ConfigBlock) Config() (*common.Config, error) { + return common.NewConfigFrom(c.Raw) +} + +// ConfigWithMeta returns a reload.ConfigWithMeta object holding the config from this block, meta will be nil +func (c *ConfigBlock) ConfigWithMeta() (*reload.ConfigWithMeta, error) { + config, err := c.Config() + if err != nil { + return nil, err + } + return &reload.ConfigWithMeta{ + Config: config, + }, nil +} + +// Configuration retrieves the list of configuration blocks from Kibana +func (c *Client) Configuration(accessToken string, beatUUID uuid.UUID) (ConfigBlocks, error) { + headers := http.Header{} + headers.Set("kbn-beats-access-token", accessToken) + + resp := struct { + ConfigBlocks []*struct { + Type string `json:"type"` + Raw map[string]interface{} `json:"config"` + } `json:"configuration_blocks"` + }{} + _, err := c.request("GET", "/api/beats/agent/"+beatUUID.String()+"/configuration", nil, headers, &resp) + if err != nil { + return nil, err + } + + blocks := map[string][]*ConfigBlock{} + for _, block := range resp.ConfigBlocks { + blocks[block.Type] = append(blocks[block.Type], &ConfigBlock{Raw: block.Raw}) + } + + res := ConfigBlocks{} + for t, b := range blocks { + res = append(res, ConfigBlocksWithType{Type: t, Blocks: b}) + } + + return res, nil +} + +// ConfigBlocksEqual returns true if the given config blocks are equal, false if not +func ConfigBlocksEqual(a, b ConfigBlocks) bool { + if len(a) != len(b) { + return false + } + + if len(a) == 0 { + return true + } + + return reflect.DeepEqual(a, b) +} diff --git a/x-pack/libbeat/management/api/configuration_test.go b/x-pack/libbeat/management/api/configuration_test.go new file mode 100644 index 00000000000..1be98917f81 --- /dev/null +++ b/x-pack/libbeat/management/api/configuration_test.go @@ -0,0 +1,166 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package api + +import ( + "fmt" + "net/http" + "testing" + + "github.com/satori/go.uuid" + "github.com/stretchr/testify/assert" +) + +func TestConfiguration(t *testing.T) { + beatUUID := uuid.NewV4() + + server, client := newServerClientPair(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check correct path is used + assert.Equal(t, "/api/beats/agent/"+beatUUID.String()+"/configuration", r.URL.Path) + + // Check enrollment token is correct + assert.Equal(t, "thisismyenrollmenttoken", r.Header.Get("kbn-beats-access-token")) + + fmt.Fprintf(w, `{"configuration_blocks":[{"type":"filebeat.modules","config":{"module":"apache2"}},{"type":"metricbeat.modules","config":{"module":"system","period":"10s"}}]}`) + })) + defer server.Close() + + configs, err := client.Configuration("thisismyenrollmenttoken", beatUUID) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 2, len(configs)) + checked := 0 + for _, config := range configs { + if config.Type == "metricbeat.modules" { + assert.Equal(t, &ConfigBlock{Raw: map[string]interface{}{ + "module": "system", + "period": "10s", + }}, config.Blocks[0]) + checked++ + + } else if config.Type == "filebeat.modules" { + assert.Equal(t, &ConfigBlock{Raw: map[string]interface{}{ + "module": "apache2", + }}, config.Blocks[0]) + checked++ + } + } + + assert.Equal(t, 2, checked) +} + +func TestConfigBlocksEqual(t *testing.T) { + tests := []struct { + name string + a, b ConfigBlocks + equal bool + }{ + { + name: "empty lists or nil", + a: nil, + b: ConfigBlocks{}, + equal: true, + }, + { + name: "single element", + a: ConfigBlocks{ + ConfigBlocksWithType{ + Type: "metricbeat.modules", + Blocks: []*ConfigBlock{ + &ConfigBlock{ + Raw: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, + }, + b: ConfigBlocks{ + ConfigBlocksWithType{ + Type: "metricbeat.modules", + Blocks: []*ConfigBlock{ + &ConfigBlock{ + Raw: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, + }, + equal: true, + }, + { + name: "different number of blocks", + a: ConfigBlocks{ + ConfigBlocksWithType{ + Type: "metricbeat.modules", + Blocks: []*ConfigBlock{ + &ConfigBlock{ + Raw: map[string]interface{}{ + "foo": "bar", + }, + }, + &ConfigBlock{ + Raw: map[string]interface{}{ + "baz": "buzz", + }, + }, + }, + }, + }, + b: ConfigBlocks{ + ConfigBlocksWithType{ + Type: "metricbeat.modules", + Blocks: []*ConfigBlock{ + &ConfigBlock{ + Raw: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, + }, + equal: false, + }, + { + name: "different block", + a: ConfigBlocks{ + ConfigBlocksWithType{ + Type: "metricbeat.modules", + Blocks: []*ConfigBlock{ + + &ConfigBlock{ + Raw: map[string]interface{}{ + "baz": "buzz", + }, + }, + }, + }, + }, + b: ConfigBlocks{ + ConfigBlocksWithType{ + Type: "metricbeat.modules", + Blocks: []*ConfigBlock{ + + &ConfigBlock{ + Raw: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, + }, + equal: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.equal, ConfigBlocksEqual(test.a, test.b)) + }) + } +} diff --git a/x-pack/libbeat/management/api/enroll.go b/x-pack/libbeat/management/api/enroll.go new file mode 100644 index 00000000000..4b836802c10 --- /dev/null +++ b/x-pack/libbeat/management/api/enroll.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package api + +import ( + "net/http" + + uuid "github.com/satori/go.uuid" + + "github.com/elastic/beats/libbeat/common" +) + +// Enroll a beat in central management, this call returns a valid access token to retrieve configurations +func (c *Client) Enroll(beatType, beatName, beatVersion, hostname string, beatUUID uuid.UUID, enrollmentToken string) (string, error) { + params := common.MapStr{ + "type": beatType, + "name": beatName, + "version": beatVersion, + "host_name": hostname, + } + + resp := struct { + AccessToken string `json:"access_token"` + }{} + + headers := http.Header{} + headers.Set("kbn-beats-enrollment-token", enrollmentToken) + + _, err := c.request("POST", "/api/beats/agent/"+beatUUID.String(), params, headers, &resp) + if err != nil { + return "", err + } + + return resp.AccessToken, err +} diff --git a/x-pack/libbeat/management/api/enroll_test.go b/x-pack/libbeat/management/api/enroll_test.go new file mode 100644 index 00000000000..a94ab638a0c --- /dev/null +++ b/x-pack/libbeat/management/api/enroll_test.go @@ -0,0 +1,73 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package api + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "testing" + + uuid "github.com/satori/go.uuid" + "github.com/stretchr/testify/assert" +) + +func TestEnrollValid(t *testing.T) { + beatUUID := uuid.NewV4() + + server, client := newServerClientPair(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + + // Check correct path is used + assert.Equal(t, "/api/beats/agent/"+beatUUID.String(), r.URL.Path) + + // Check enrollment token is correct + assert.Equal(t, "thisismyenrollmenttoken", r.Header.Get("kbn-beats-enrollment-token")) + + request := struct { + Hostname string `json:"host_name"` + Type string `json:"type"` + Version string `json:"version"` + Name string `json:"name"` + }{} + if err := json.Unmarshal(body, &request); err != nil { + t.Fatal(err) + } + + assert.Equal(t, "myhostname.lan", request.Hostname) + assert.Equal(t, "metricbeat", request.Type) + assert.Equal(t, "6.3.0", request.Version) + assert.Equal(t, "beatname", request.Name) + + fmt.Fprintf(w, `{"access_token": "fooo"}`) + })) + defer server.Close() + + accessToken, err := client.Enroll("metricbeat", "beatname", "6.3.0", "myhostname.lan", beatUUID, "thisismyenrollmenttoken") + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, "fooo", accessToken) +} + +func TestEnrollError(t *testing.T) { + beatUUID := uuid.NewV4() + + server, client := newServerClientPair(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, `{"message": "Invalid enrollment token"}`, 400) + })) + defer server.Close() + + accessToken, err := client.Enroll("metricbeat", "beatname", "6.3.0", "myhostname.lan", beatUUID, "thisismyenrollmenttoken") + + assert.NotNil(t, err) + assert.Equal(t, "", accessToken) +} diff --git a/x-pack/libbeat/management/api/enrollment_token.go b/x-pack/libbeat/management/api/enrollment_token.go new file mode 100644 index 00000000000..dec537d1d52 --- /dev/null +++ b/x-pack/libbeat/management/api/enrollment_token.go @@ -0,0 +1,29 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package api + +import ( + "fmt" + "net/http" +) + +// CreateEnrollmentToken talks to Kibana API and generates an enrollment token +func (c *Client) CreateEnrollmentToken() (string, error) { + headers := http.Header{} + + resp := struct { + Tokens []string `json:"tokens"` + }{} + _, err := c.request("POST", "/api/beats/enrollment_tokens", nil, headers, &resp) + if err != nil { + return "", err + } + + if len(resp.Tokens) != 1 { + return "", fmt.Errorf("Unexpected number of tokens, got %d, only one expected", len(resp.Tokens)) + } + + return resp.Tokens[0], nil +} diff --git a/x-pack/libbeat/management/api/enrollment_token_test.go b/x-pack/libbeat/management/api/enrollment_token_test.go new file mode 100644 index 00000000000..2e4ce156849 --- /dev/null +++ b/x-pack/libbeat/management/api/enrollment_token_test.go @@ -0,0 +1,29 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package api + +import ( + "fmt" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEnrollmentToken(t *testing.T) { + server, client := newServerClientPair(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Check correct path is used + assert.Equal(t, "/api/beats/enrollment_tokens", r.URL.Path) + fmt.Fprintf(w, `{"tokens":["65074ff8639a4661ba7e1bd5ccc209ed"]}`) + })) + defer server.Close() + + token, err := client.CreateEnrollmentToken() + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, "65074ff8639a4661ba7e1bd5ccc209ed", token) +} diff --git a/x-pack/libbeat/management/config.go b/x-pack/libbeat/management/config.go new file mode 100644 index 00000000000..e76a66c757c --- /dev/null +++ b/x-pack/libbeat/management/config.go @@ -0,0 +1,86 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "os" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/file" + "github.com/elastic/beats/libbeat/kibana" + "github.com/elastic/beats/libbeat/paths" + "github.com/elastic/beats/x-pack/libbeat/management/api" + + "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) + +// Config for central management +type Config struct { + // true when enrolled + Enabled bool + + // Poll configs period + Period time.Duration + + AccessToken string + + Kibana *kibana.ClientConfig + + Configs api.ConfigBlocks +} + +func defaultConfig() *Config { + return &Config{ + Period: 60 * time.Second, + } +} + +// Load settings from its source file +func (c *Config) Load() error { + path := paths.Resolve(paths.Data, "management.yml") + config, err := common.LoadFile(path) + if err != nil { + if os.IsNotExist(err) { + // File is not present, beat is not enrolled + return nil + } + return err + } + + if err = config.Unpack(&c); err != nil { + return err + } + + return nil +} + +// Save settings to management.yml file +func (c *Config) Save() error { + path := paths.Resolve(paths.Data, "management.yml") + + data, err := yaml.Marshal(c) + if err != nil { + return err + } + + // write temporary file first + tempFile := path + ".new" + f, err := os.OpenFile(tempFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return errors.Wrap(err, "failed to store central management settings") + } + + _, err = f.Write(data) + f.Close() + if err != nil { + return err + } + + // move temporary file into final location + err = file.SafeFileRotate(path, tempFile) + return err +} diff --git a/x-pack/libbeat/management/enroll.go b/x-pack/libbeat/management/enroll.go new file mode 100644 index 00000000000..70ec42697db --- /dev/null +++ b/x-pack/libbeat/management/enroll.go @@ -0,0 +1,41 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "github.com/elastic/beats/libbeat/cmd/instance" + "github.com/elastic/beats/x-pack/libbeat/management/api" +) + +// Enroll this beat to the given kibana +// This will use Central Management API to enroll and retrieve an access key for config retrieval +func Enroll(beat *instance.Beat, kibanaURL, enrollmentToken string) error { + config, err := api.ConfigFromURL(kibanaURL) + if err != nil { + return err + } + + // Ignore kibana version to avoid permission errors + config.IgnoreVersion = true + + client, err := api.NewClient(config) + if err != nil { + return err + } + + accessToken, err := client.Enroll(beat.Info.Beat, beat.Info.Name, beat.Info.Version, beat.Info.Hostname, beat.Info.UUID, enrollmentToken) + if err != nil { + return err + } + + // Enrolled, persist state + // TODO use beat.Keystore() for access_token + settings := defaultConfig() + settings.Enabled = true + settings.AccessToken = accessToken + settings.Kibana = config + + return settings.Save() +} diff --git a/x-pack/libbeat/management/manager.go b/x-pack/libbeat/management/manager.go new file mode 100644 index 00000000000..d9c3d0b5943 --- /dev/null +++ b/x-pack/libbeat/management/manager.go @@ -0,0 +1,208 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "sync" + "time" + + "github.com/elastic/beats/libbeat/common/reload" + + "github.com/satori/go.uuid" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/feature" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/libbeat/management/api" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/management" +) + +func init() { + management.Register("x-pack", NewConfigManager, feature.Beta) +} + +// ConfigManager handles internal config updates. By retrieving +// new configs from Kibana and applying them to the Beat +type ConfigManager struct { + config *Config + logger *logp.Logger + client *api.Client + beatUUID uuid.UUID + done chan struct{} + registry *reload.Registry + wg sync.WaitGroup +} + +// NewConfigManager returns a X-Pack Beats Central Management manager +func NewConfigManager(registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) { + c := defaultConfig() + if err := c.Load(); err != nil { + return nil, errors.Wrap(err, "reading central management internal settings") + } + return NewConfigManagerWithConfig(c, registry, beatUUID) +} + +// NewConfigManagerWithConfig returns a X-Pack Beats Central Management manager +func NewConfigManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) { + var client *api.Client + if c.Enabled { + var err error + + // Ignore kibana version to avoid permission errors + c.Kibana.IgnoreVersion = true + + client, err = api.NewClient(c.Kibana) + if err != nil { + return nil, errors.Wrap(err, "initializing kibana client") + } + } + + return &ConfigManager{ + config: c, + logger: logp.NewLogger(management.DebugK), + client: client, + done: make(chan struct{}), + beatUUID: beatUUID, + registry: registry, + }, nil +} + +// Enabled returns true if config management is enabled +func (cm *ConfigManager) Enabled() bool { + return cm.config.Enabled +} + +// Start the config manager +func (cm *ConfigManager) Start() { + if !cm.Enabled() { + return + } + cfgwarn.Beta("Central management is enabled") + cm.logger.Info("Starting central management service") + + cm.wg.Add(1) + go cm.worker() +} + +// Stop the config manager +func (cm *ConfigManager) Stop() { + if !cm.Enabled() { + return + } + cm.logger.Info("Stopping central management service") + close(cm.done) + cm.wg.Wait() +} + +// CheckRawConfig check settings are correct to start the beat. This method +// checks there are no collision between the existing configuration and what +// central management can configure. +func (cm *ConfigManager) CheckRawConfig(cfg *common.Config) error { + // TODO implement this method + return nil +} + +func (cm *ConfigManager) worker() { + defer cm.wg.Done() + + // Initial fetch && apply (even if errors happen while fetching) + firstRun := true + period := 0 * time.Second + + // Start worker loop: fetch + apply + cache new settings + for { + select { + case <-cm.done: + return + case <-time.After(period): + } + + changed := cm.fetch() + if changed || firstRun { + // configs changed, apply changes + // TODO only reload the blocks that changed + cm.apply() + } + + if changed { + // store new configs (already applied) + cm.logger.Info("Storing new state") + if err := cm.config.Save(); err != nil { + cm.logger.Errorf("error storing central management state: %s", err) + } + } + + if firstRun { + period = cm.config.Period + firstRun = false + } + } +} + +// fetch configurations from kibana, return true if they changed +func (cm *ConfigManager) fetch() bool { + cm.logger.Debug("Retrieving new configurations from Kibana") + configs, err := cm.client.Configuration(cm.config.AccessToken, cm.beatUUID) + if err != nil { + cm.logger.Errorf("error retriving new configurations, will use cached ones: %s", err) + return false + } + + if api.ConfigBlocksEqual(configs, cm.config.Configs) { + cm.logger.Debug("configuration didn't change, sleeping") + return false + } + + cm.logger.Info("New configurations retrieved") + cm.config.Configs = configs + + return true +} + +func (cm *ConfigManager) apply() { + for _, b := range cm.config.Configs { + cm.reload(b.Type, b.Blocks) + } +} + +func (cm *ConfigManager) reload(t string, blocks []*api.ConfigBlock) { + cm.logger.Infof("Applying settings for %s", t) + + if obj := cm.registry.GetReloadable(t); obj != nil { + // Single object + if len(blocks) != 1 { + cm.logger.Errorf("got an invalid number of configs for %s: %d, expected: 1", t, len(blocks)) + return + } + config, err := blocks[0].ConfigWithMeta() + if err != nil { + cm.logger.Error(err) + return + } + + if err := obj.Reload(config); err != nil { + cm.logger.Error(err) + } + } else if obj := cm.registry.GetReloadableList(t); obj != nil { + // List + var configs []*reload.ConfigWithMeta + for _, block := range blocks { + config, err := block.ConfigWithMeta() + if err != nil { + cm.logger.Error(err) + continue + } + configs = append(configs, config) + } + + if err := obj.Reload(configs); err != nil { + cm.logger.Error(err) + } + } +} diff --git a/x-pack/libbeat/management/manager_test.go b/x-pack/libbeat/management/manager_test.go new file mode 100644 index 00000000000..9e37b4d061e --- /dev/null +++ b/x-pack/libbeat/management/manager_test.go @@ -0,0 +1,93 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/satori/go.uuid" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/reload" + "github.com/elastic/beats/x-pack/libbeat/management/api" +) + +type reloadable struct { + reloaded chan *reload.ConfigWithMeta +} + +func (r *reloadable) Reload(c *reload.ConfigWithMeta) error { + r.reloaded <- c + return nil +} + +func TestConfigManager(t *testing.T) { + registry := reload.NewRegistry() + id := uuid.NewV4() + accessToken := "footoken" + reloadable := reloadable{ + reloaded: make(chan *reload.ConfigWithMeta, 1), + } + registry.MustRegister("test.block", &reloadable) + + mux := http.NewServeMux() + i := 0 + responses := []string{ + // Initial load + `{"configuration_blocks":[{"type":"test.block","config":{"module":"apache2"}}]}`, + + // No change, no reload + `{"configuration_blocks":[{"type":"test.block","config":{"module":"apache2"}}]}`, + + // Changed, reload + `{"configuration_blocks":[{"type":"test.block","config":{"module":"system"}}]}`, + } + mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, fmt.Sprintf("/api/beats/agent/%s/configuration", id), r.RequestURI) + fmt.Fprintf(w, responses[i]) + i++ + })) + + server := httptest.NewServer(mux) + + c, err := api.ConfigFromURL(server.URL) + if err != nil { + t.Fatal(err) + } + + config := &Config{ + Enabled: true, + Period: 100 * time.Millisecond, + Kibana: c, + AccessToken: accessToken, + } + + manager, err := NewConfigManagerWithConfig(config, registry, id) + if err != nil { + t.Fatal(err) + } + + manager.Start() + + // On first reload we will get apache2 module + config1 := <-reloadable.reloaded + assert.Equal(t, &reload.ConfigWithMeta{ + Config: common.MustNewConfigFrom(map[string]interface{}{ + "module": "apache2", + }), + }, config1) + + config2 := <-reloadable.reloaded + assert.Equal(t, &reload.ConfigWithMeta{ + Config: common.MustNewConfigFrom(map[string]interface{}{ + "module": "system", + }), + }, config2) +} diff --git a/x-pack/metricbeat/cmd/root.go b/x-pack/metricbeat/cmd/root.go index fc086b2340a..8a26219bb4e 100644 --- a/x-pack/metricbeat/cmd/root.go +++ b/x-pack/metricbeat/cmd/root.go @@ -4,11 +4,14 @@ package cmd -import "github.com/elastic/beats/metricbeat/cmd" +import ( + "github.com/elastic/beats/metricbeat/cmd" + xpackcmd "github.com/elastic/beats/x-pack/libbeat/cmd" +) // RootCmd to handle beats cli var RootCmd = cmd.RootCmd func init() { - // TODO inject x-pack features + xpackcmd.AddXPack(RootCmd, cmd.Name) } diff --git a/x-pack/packetbeat/cmd/root.go b/x-pack/packetbeat/cmd/root.go index 904eb99dac8..1ddfbdcfd1b 100644 --- a/x-pack/packetbeat/cmd/root.go +++ b/x-pack/packetbeat/cmd/root.go @@ -4,11 +4,14 @@ package cmd -import "github.com/elastic/beats/packetbeat/cmd" +import ( + "github.com/elastic/beats/packetbeat/cmd" + xpackcmd "github.com/elastic/beats/x-pack/libbeat/cmd" +) // RootCmd to handle beats cli var RootCmd = cmd.RootCmd func init() { - // TODO inject x-pack features + xpackcmd.AddXPack(RootCmd, cmd.Name) } diff --git a/x-pack/winlogbeat/cmd/root.go b/x-pack/winlogbeat/cmd/root.go index 5a7236a07d4..b1da4f69a54 100644 --- a/x-pack/winlogbeat/cmd/root.go +++ b/x-pack/winlogbeat/cmd/root.go @@ -4,11 +4,14 @@ package cmd -import "github.com/elastic/beats/winlogbeat/cmd" +import ( + "github.com/elastic/beats/winlogbeat/cmd" + xpackcmd "github.com/elastic/beats/x-pack/libbeat/cmd" +) // RootCmd to handle beats cli var RootCmd = cmd.RootCmd func init() { - // TODO inject x-pack features + xpackcmd.AddXPack(RootCmd, cmd.Name) }