Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Central Management feature #8559

Merged
merged 10 commits into from
Oct 4, 2018
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Report number of open file handles on Windows. {pull}8329[8329]
- Support for Kafka 2.0.0 in kafka output {pull}8399[8399]
- Add setting `setup.kibana.space.id` to support Kibana Spaces {pull}7942[7942]
- Add Beats Central Management {pull}8559[8559]

*Auditbeat*

Expand Down
3 changes: 3 additions & 0 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package beat

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/management"
)

// Creator initializes and configures a new Beater instance used to execute
Expand Down Expand Up @@ -64,6 +65,8 @@ type Beat struct {
BeatConfig *common.Config // The beat's own configuration section

Fields []byte // Data from fields.yml

ConfigManager management.ConfigManager // config manager
}

// BeatConfig struct contains the basic configuration of every beat
Expand Down
23 changes: 21 additions & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import (
"github.com/satori/go.uuid"
"go.uber.org/zap"

"github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"

"github.com/elastic/beats/libbeat/api"
"github.com/elastic/beats/libbeat/asset"
"github.com/elastic/beats/libbeat/beat"
Expand All @@ -43,11 +46,13 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/common/file"
"github.com/elastic/beats/libbeat/common/reload"
"github.com/elastic/beats/libbeat/common/seccomp"
"github.com/elastic/beats/libbeat/dashboards"
"github.com/elastic/beats/libbeat/keystore"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/logp/configure"
"github.com/elastic/beats/libbeat/management"
"github.com/elastic/beats/libbeat/metric/system/host"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/monitoring/report"
Expand All @@ -59,8 +64,6 @@ import (
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/template"
"github.com/elastic/beats/libbeat/version"
"github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"

// Register publisher pipeline modules
_ "github.com/elastic/beats/libbeat/publisher/includes"
Expand Down Expand Up @@ -313,6 +316,8 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
return nil, fmt.Errorf("error initializing publisher: %+v", err)
}

reload.Register.MustRegister("output", pipeline.OutputReloader())

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
// defer pipeline.Close()
Expand Down Expand Up @@ -393,6 +398,10 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
api.Start(b.Config.HTTP)
}

// Launch config manager
b.ConfigManager.Start()
defer b.ConfigManager.Stop()

return beater.Run(&b.Beat)
}

Expand Down Expand Up @@ -573,6 +582,16 @@ func (b *Beat) configure(settings Settings) error {

logp.Info("Beat UUID: %v", b.Info.UUID)

// initialize config manager
b.ConfigManager, err = management.Factory()(reload.Register, b.Beat.Info.UUID)
if err != nil {
return err
}

if err := b.ConfigManager.CheckRawConfig(b.RawConfig); err != nil {
return err
}

if maxProcs := b.Config.MaxProcs; maxProcs > 0 {
runtime.GOMAXPROCS(maxProcs)
}
Expand Down
78 changes: 78 additions & 0 deletions libbeat/common/cli/password.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cli

import (
"fmt"
"os"
"strings"
"syscall"

"github.com/pkg/errors"
"golang.org/x/crypto/ssh/terminal"
)

type method func(m string) (string, error)

var methods = map[string]method{
"stdin": stdin,
"env": env,
}

// ReadPassword allows to read a password passed as a command line parameter.
// It offers several ways to read the password so it is not directly passed as a plain text argument:
// stdin - Will prompt the user to input the password
// env:VAR_NAME - Will read the password from the given env variable
//
func ReadPassword(def string) (string, error) {
if len(def) == 0 {
return "", errors.New("empty password definition")
}

var method, params string
parts := strings.SplitN(def, ":", 2)
method = strings.ToLower(parts[0])

if len(parts) == 2 {
params = parts[1]
}

m := methods[method]
if m == nil {
return "", errors.New("unknown password source, use stdin or env:VAR_NAME")
}

return m(params)
}

func stdin(p string) (string, error) {
fmt.Print("Enter password: ")
bytePassword, err := terminal.ReadPassword(int(syscall.Stdin))
if err != nil {
return "", errors.Wrap(err, "reading password input")
}
return string(bytePassword), nil
}

func env(p string) (string, error) {
if len(p) == 0 {
return "", errors.New("env variable name is needed when using env: password method")
}

return os.Getenv(p), nil
}
65 changes: 65 additions & 0 deletions libbeat/common/cli/password_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cli

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestReadPassword(t *testing.T) {
os.Setenv("FOO", "random")

tests := []struct {
name string
input string
password string
error bool
}{
{
name: "Test env variable",
input: "env:FOO",
password: "random",
},
{
name: "Test unknown method",
input: "foo:bar",
error: true,
},
{
name: "Test empty input",
input: "",
error: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
password, err := ReadPassword(test.input)
assert.Equal(t, test.password, password)

if test.error {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}
2 changes: 1 addition & 1 deletion libbeat/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (
"strings"

ucfg "github.com/elastic/go-ucfg"
"github.com/elastic/go-ucfg/cfgutil"
"github.com/elastic/go-ucfg/yaml"

"github.com/elastic/beats/libbeat/common/file"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/go-ucfg/cfgutil"
)

var flagStrictPerms = flag.Bool("strict.perms", true, "Strict permission checking on config files")
Expand Down
28 changes: 15 additions & 13 deletions libbeat/common/reload/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// Register holds a registry of reloadable objects
var Register = newRegistry()
var Register = NewRegistry()

// ConfigWithMeta holds a pair of common.Config and optional metadata for it
type ConfigWithMeta struct {
Expand All @@ -47,21 +47,23 @@ type Reloadable interface {
Reload(config *ConfigWithMeta) error
}

type registry struct {
// Registry of reloadable objects and lists
type Registry struct {
sync.RWMutex
confsLists map[string]ReloadableList
confs map[string]Reloadable
}

func newRegistry() *registry {
return &registry{
// NewRegistry initializes and returns a reload registry
func NewRegistry() *Registry {
return &Registry{
confsLists: make(map[string]ReloadableList),
confs: make(map[string]Reloadable),
}
}

// Register declares a reloadable object
func (r *registry) Register(name string, obj Reloadable) error {
func (r *Registry) Register(name string, obj Reloadable) error {
r.Lock()
defer r.Unlock()

Expand All @@ -78,7 +80,7 @@ func (r *registry) Register(name string, obj Reloadable) error {
}

// RegisterList declares a reloadable list of configurations
func (r *registry) RegisterList(name string, list ReloadableList) error {
func (r *Registry) RegisterList(name string, list ReloadableList) error {
r.Lock()
defer r.Unlock()

Expand All @@ -95,34 +97,34 @@ func (r *registry) RegisterList(name string, list ReloadableList) error {
}

// MustRegister declares a reloadable object
func (r *registry) MustRegister(name string, obj Reloadable) {
func (r *Registry) MustRegister(name string, obj Reloadable) {
if err := r.Register(name, obj); err != nil {
panic(err)
}
}

// MustRegisterList declares a reloadable object list
func (r *registry) MustRegisterList(name string, list ReloadableList) {
func (r *Registry) MustRegisterList(name string, list ReloadableList) {
if err := r.RegisterList(name, list); err != nil {
panic(err)
}
}

// Get returns the reloadable object with the given name, nil if not found
func (r *registry) Get(name string) Reloadable {
// GetReloadable returns the reloadable object with the given name, nil if not found
func (r *Registry) GetReloadable(name string) Reloadable {
r.RLock()
defer r.RUnlock()
return r.confs[name]
}

// GetList returns the reloadable list with the given name, nil if not found
func (r *registry) GetList(name string) ReloadableList {
// GetReloadableList returns the reloadable list with the given name, nil if not found
func (r *Registry) GetReloadableList(name string) ReloadableList {
r.RLock()
defer r.RUnlock()
return r.confsLists[name]
}

func (r *registry) nameTaken(name string) bool {
func (r *Registry) nameTaken(name string) bool {
if _, ok := r.confs[name]; ok {
return true
}
Expand Down
16 changes: 8 additions & 8 deletions libbeat/common/reload/reload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,26 @@ type reloadableList struct{}
func (reloadable) Reload(config *ConfigWithMeta) error { return nil }
func (reloadableList) Reload(config []*ConfigWithMeta) error { return nil }

func RegisterReloadable(t *testing.T) {
func TestRegisterReloadable(t *testing.T) {
obj := reloadable{}
r := newRegistry()
r := NewRegistry()

r.Register("my.reloadable", obj)

assert.Equal(t, obj, r.Get("my.reloadable"))
assert.Equal(t, obj, r.GetReloadable("my.reloadable"))
}

func RegisterReloadableList(t *testing.T) {
func TestRegisterReloadableList(t *testing.T) {
objl := reloadableList{}
r := newRegistry()
r := NewRegistry()

r.RegisterList("my.reloadable", objl)

assert.Equal(t, objl, r.Get("my.reloadable"))
assert.Equal(t, objl, r.GetReloadableList("my.reloadable"))
}

func TestRegisterNilFails(t *testing.T) {
r := newRegistry()
r := NewRegistry()

err := r.Register("name", nil)
assert.Error(t, err)
Expand All @@ -58,7 +58,7 @@ func TestRegisterNilFails(t *testing.T) {
}

func TestReRegisterFails(t *testing.T) {
r := newRegistry()
r := NewRegistry()

// two obj with the same name
err := r.Register("name", reloadable{})
Expand Down
6 changes: 4 additions & 2 deletions libbeat/kibana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,10 @@ func NewClientWithConfig(config *ClientConfig) (*Client, error) {
},
}

if err = client.SetVersion(); err != nil {
return nil, fmt.Errorf("fail to get the Kibana version: %v", err)
if !config.IgnoreVersion {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remind me in which case we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically when doing enrollment, Kibana will probably be using x-pack security, so you need user/password to access the version, we don't have it, as enrollment endpoints don't require username/password but a valid token.

This change allows to configure the client to ignore kibana version check when doing enrollment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Chicken / egg problem here :-)

if err = client.SetVersion(); err != nil {
return nil, fmt.Errorf("fail to get the Kibana version: %v", err)
}
}

return client, nil
Expand Down
Loading