diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index cce864723f3..3f0419ca38d 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -69,3 +69,4 @@ - Push log level downstream {pull}22815[22815] - Add metrics collection for Agent {pull}22793[22793] - Add support for Fleet Server {pull}23736[23736] +- Add support for enrollment with local bootstrap of Fleet Server {pull}23865[23865] diff --git a/x-pack/elastic-agent/pkg/agent/application/application.go b/x-pack/elastic-agent/pkg/agent/application/application.go index af50477d5fd..c13157e02f5 100644 --- a/x-pack/elastic-agent/pkg/agent/application/application.go +++ b/x-pack/elastic-agent/pkg/agent/application/application.go @@ -6,6 +6,11 @@ package application import ( "context" + "fmt" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/upgrade" @@ -31,7 +36,7 @@ type upgraderControl interface { } // New creates a new Agent and bootstrap the required subsystem. -func New(log *logger.Logger, pathConfigFile string, reexec reexecManager, uc upgraderControl, agentInfo *info.AgentInfo) (Application, error) { +func New(log *logger.Logger, pathConfigFile string, reexec reexecManager, statusCtrl status.Controller, uc upgraderControl, agentInfo *info.AgentInfo) (Application, error) { // Load configuration from disk to understand in which mode of operation // we must start the elastic-agent, the mode of operation cannot be changed without restarting the // elastic-agent. @@ -44,7 +49,7 @@ func New(log *logger.Logger, pathConfigFile string, reexec reexecManager, uc upg return nil, err } - return createApplication(log, pathConfigFile, rawConfig, reexec, uc, agentInfo) + return createApplication(log, pathConfigFile, rawConfig, reexec, statusCtrl, uc, agentInfo) } func createApplication( @@ -52,6 +57,7 @@ func createApplication( pathConfigFile string, rawConfig *config.Config, reexec reexecManager, + statusCtrl status.Controller, uc upgraderControl, agentInfo *info.AgentInfo, ) (Application, error) { @@ -66,14 +72,72 @@ func createApplication( if IsStandalone(cfg.Fleet) { log.Info("Agent is managed locally") - return newLocal(ctx, log, pathConfigFile, rawConfig, reexec, uc, agentInfo) + return newLocal(ctx, log, pathConfigFile, rawConfig, reexec, statusCtrl, uc, agentInfo) + } + + // not in standalone; both modes require reading the fleet.yml configuration file + var store storage.Store + store, cfg, err = mergeFleetConfig(rawConfig) + + if IsFleetServerBootstrap(cfg.Fleet) { + log.Info("Agent is in Fleet Server bootstrap mode") + return newFleetServerBootstrap(ctx, log, pathConfigFile, rawConfig, statusCtrl, agentInfo) } log.Info("Agent is managed by Fleet") - return newManaged(ctx, log, rawConfig, reexec, agentInfo) + return newManaged(ctx, log, store, cfg, rawConfig, reexec, statusCtrl, agentInfo) } // IsStandalone decides based on missing of fleet.enabled: true or fleet.{access_token,kibana} will place Elastic Agent into standalone mode. func IsStandalone(cfg *configuration.FleetAgentConfig) bool { return cfg == nil || !cfg.Enabled } + +// IsFleetServerBootstrap decides if Elastic Agent is started in bootstrap mode. +func IsFleetServerBootstrap(cfg *configuration.FleetAgentConfig) bool { + return cfg != nil && cfg.Server != nil && cfg.Server.Bootstrap +} + +func mergeFleetConfig(rawConfig *config.Config) (storage.Store, *configuration.Configuration, error) { + path := info.AgentConfigFile() + store := storage.NewDiskStore(path) + reader, err := store.Load() + if err != nil { + return store, nil, errors.New(err, "could not initialize config store", + errors.TypeFilesystem, + errors.M(errors.MetaKeyPath, path)) + } + config, err := config.NewConfigFrom(reader) + if err != nil { + return store, nil, errors.New(err, + fmt.Sprintf("fail to read configuration %s for the elastic-agent", path), + errors.TypeFilesystem, + errors.M(errors.MetaKeyPath, path)) + } + + // merge local configuration and configuration persisted from fleet. + err = rawConfig.Merge(config) + if err != nil { + return store, nil, errors.New(err, + fmt.Sprintf("fail to merge configuration with %s for the elastic-agent", path), + errors.TypeConfig, + errors.M(errors.MetaKeyPath, path)) + } + + cfg, err := configuration.NewFromConfig(rawConfig) + if err != nil { + return store, nil, errors.New(err, + fmt.Sprintf("fail to unpack configuration from %s", path), + errors.TypeFilesystem, + errors.M(errors.MetaKeyPath, path)) + } + + if err := cfg.Fleet.Valid(); err != nil { + return store, nil, errors.New(err, + "fleet configuration is invalid", + errors.TypeFilesystem, + errors.M(errors.MetaKeyPath, path)) + } + + return store, cfg, nil +} diff --git a/x-pack/elastic-agent/pkg/agent/application/config.go b/x-pack/elastic-agent/pkg/agent/application/config.go index 8dfd093e040..d0eb80449a2 100644 --- a/x-pack/elastic-agent/pkg/agent/application/config.go +++ b/x-pack/elastic-agent/pkg/agent/application/config.go @@ -26,3 +26,26 @@ func createFleetConfigFromEnroll(accessAPIKey string, kbn *kibana.Config) (*conf } return cfg, nil } + +func createFleetServerBootstrapConfig(connStr string, policyID string) (*configuration.FleetAgentConfig, error) { + es, err := configuration.ElasticsearchFromConnStr(connStr) + if err != nil { + return nil, err + } + cfg := configuration.DefaultFleetAgentConfig() + cfg.Enabled = true + cfg.Server = &configuration.FleetServerConfig{ + Bootstrap: true, + Output: configuration.FleetServerOutputConfig{ + Elasticsearch: es, + }, + } + if policyID != "" { + cfg.Server.Policy = &configuration.FleetServerPolicyConfig{ID: policyID} + } + + if err := cfg.Valid(); err != nil { + return nil, errors.New(err, "invalid enrollment options", errors.TypeConfig) + } + return cfg, nil +} diff --git a/x-pack/elastic-agent/pkg/agent/application/enroll_cmd.go b/x-pack/elastic-agent/pkg/agent/application/enroll_cmd.go index fe4cafa7594..82d996bd620 100644 --- a/x-pack/elastic-agent/pkg/agent/application/enroll_cmd.go +++ b/x-pack/elastic-agent/pkg/agent/application/enroll_cmd.go @@ -9,9 +9,15 @@ import ( "context" "fmt" "io" + "math/rand" "net/http" "net/url" "os" + "time" + + "github.com/elastic/beats/v7/libbeat/common/backoff" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" "gopkg.in/yaml.v2" @@ -25,6 +31,16 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) +const ( + waitingForAgent = "waiting for Elastic Agent to start" + waitingForFleetServer = "waiting for Elastic Agent to start Fleet Server" +) + +var ( + enrollDelay = 1 * time.Second // max delay to start enrollment + daemonTimeout = 30 * time.Second // max amount of for communication to running Agent daemon +) + type store interface { Save(io.Reader) error } @@ -66,6 +82,9 @@ type EnrollCmdOption struct { UserProvidedMetadata map[string]interface{} EnrollAPIKey string Staging string + FleetServerConnStr string + FleetServerPolicyID string + NoRestart bool } func (e *EnrollCmdOption) kibanaConfig() (*kibana.Config, error) { @@ -149,7 +168,95 @@ func NewEnrollCmdWithStore( } // Execute tries to enroll the agent into Fleet. -func (c *EnrollCmd) Execute() error { +func (c *EnrollCmd) Execute(ctx context.Context) error { + if c.options.FleetServerConnStr != "" { + err := c.fleetServerBootstrap(ctx) + if err != nil { + return err + } + + // enroll should use localhost as fleet-server is now running + // it must also restart + c.options.URL = "http://localhost:8000" + c.options.NoRestart = false + } + + err := c.enrollWithBackoff(ctx) + if err != nil { + return errors.New(err, "fail to enroll") + } + + if c.options.NoRestart { + return nil + } + + if c.daemonReload(ctx) != nil { + c.log.Info("Elastic Agent might not be running; unable to trigger restart") + } + c.log.Info("Successfully triggered restart on running Elastic Agent.") + return nil +} + +func (c *EnrollCmd) fleetServerBootstrap(ctx context.Context) error { + c.log.Debug("verifying communication with running Elastic Agent daemon") + _, err := getDaemonStatus(ctx) + if err != nil { + return errors.New("failed to communicate with elastic-agent daemon; is elastic-agent running?") + } + + fleetConfig, err := createFleetServerBootstrapConfig(c.options.FleetServerConnStr, c.options.FleetServerPolicyID) + configToStore := map[string]interface{}{ + "fleet": fleetConfig, + } + reader, err := yamlToReader(configToStore) + if err != nil { + return err + } + if err := c.configStore.Save(reader); err != nil { + return errors.New(err, "could not save fleet server bootstrap information", errors.TypeFilesystem) + } + + err = c.daemonReload(ctx) + if err != nil { + return errors.New(err, "failed to trigger elastic-agent daemon reload", errors.TypeApplication) + } + + err = waitForFleetServer(ctx, c.log) + if err != nil { + return errors.New(err, "fleet-server never started by elastic-agent daemon", errors.TypeApplication) + } + return nil +} + +func (c *EnrollCmd) daemonReload(ctx context.Context) error { + daemon := client.New() + err := daemon.Connect(ctx) + if err != nil { + return err + } + defer daemon.Disconnect() + return daemon.Restart(ctx) +} + +func (c *EnrollCmd) enrollWithBackoff(ctx context.Context) error { + delay(ctx, enrollDelay) + + err := c.enroll(ctx) + signal := make(chan struct{}) + backExp := backoff.NewExpBackoff(signal, 60*time.Second, 10*time.Minute) + + for errors.Is(err, fleetapi.ErrTooManyRequests) { + c.log.Warn("Too many requests on the remote server, will retry in a moment.") + backExp.Wait() + c.log.Info("Retrying to enroll...") + err = c.enroll(ctx) + } + + close(signal) + return err +} + +func (c *EnrollCmd) enroll(ctx context.Context) error { cmd := fleetapi.NewEnrollCmd(c.client) metadata, err := metadata() @@ -167,7 +274,7 @@ func (c *EnrollCmd) Execute() error { }, } - resp, err := cmd.Execute(context.Background(), r) + resp, err := cmd.Execute(ctx, r) if err != nil { return errors.New(err, "fail to execute request to Kibana", @@ -184,6 +291,15 @@ func (c *EnrollCmd) Execute() error { "sourceURI": staging, } } + if c.options.FleetServerConnStr != "" { + serverConfig, err := createFleetServerBootstrapConfig(c.options.FleetServerConnStr, c.options.FleetServerPolicyID) + if err != nil { + return err + } + // no longer need bootstrap at this point + serverConfig.Server.Bootstrap = false + fleetConfig.Server = serverConfig.Server + } configToStore := map[string]interface{}{ "fleet": fleetConfig, @@ -225,3 +341,98 @@ func yamlToReader(in interface{}) (io.Reader, error) { } return bytes.NewReader(data), nil } + +func delay(ctx context.Context, d time.Duration) { + t := time.NewTimer(time.Duration(rand.Int63n(int64(d)))) + defer t.Stop() + select { + case <-ctx.Done(): + case <-t.C: + } +} + +func getDaemonStatus(ctx context.Context) (*client.AgentStatus, error) { + ctx, cancel := context.WithTimeout(ctx, daemonTimeout) + defer cancel() + daemon := client.New() + err := daemon.Connect(ctx) + if err != nil { + return nil, err + } + defer daemon.Disconnect() + return daemon.Status(ctx) +} + +type waitResult struct { + err error +} + +func waitForFleetServer(ctx context.Context, log *logger.Logger) error { + ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + resChan := make(chan waitResult) + innerCtx, innerCancel := context.WithCancel(context.Background()) + defer innerCancel() + go func() { + msg := "" + for { + <-time.After(1 * time.Second) + status, err := getDaemonStatus(innerCtx) + if err == context.Canceled { + resChan <- waitResult{err: err} + return + } + if err != nil { + log.Debug(waitingForAgent) + if msg != waitingForAgent { + msg = waitingForAgent + log.Info(waitingForAgent) + } + continue + } + app := getAppFromStatus(status, "fleet-server") + if app == nil { + log.Debug(waitingForFleetServer) + if msg != waitingForFleetServer { + msg = waitingForFleetServer + log.Info(waitingForFleetServer) + } + continue + } + log.Debugf("fleet-server status: %s - %s", app.Status, app.Message) + if app.Status == proto.Status_DEGRADED || app.Status == proto.Status_HEALTHY { + // app has started and is running + resChan <- waitResult{} + break + } + appMsg := fmt.Sprintf("Fleet Server - %s", app.Message) + if msg != appMsg { + msg = appMsg + log.Info(appMsg) + } + } + }() + + var res waitResult + select { + case <-ctx.Done(): + innerCancel() + res = <-resChan + case res = <-resChan: + } + + if res.err != nil { + return res.err + } + return nil +} + +func getAppFromStatus(status *client.AgentStatus, name string) *client.ApplicationStatus { + for _, app := range status.Applications { + if app.Name == name { + return app + } + } + return nil +} diff --git a/x-pack/elastic-agent/pkg/agent/application/enroll_cmd_test.go b/x-pack/elastic-agent/pkg/agent/application/enroll_cmd_test.go index 080b5efcb69..fe6786276d2 100644 --- a/x-pack/elastic-agent/pkg/agent/application/enroll_cmd_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/enroll_cmd_test.go @@ -6,6 +6,7 @@ package application import ( "bytes" + "context" "crypto/tls" "io" "io/ioutil" @@ -94,7 +95,7 @@ func TestEnroll(t *testing.T) { ) require.NoError(t, err) - err = cmd.Execute() + err = cmd.Execute(context.Background()) require.Error(t, err) }, )) @@ -147,7 +148,7 @@ func TestEnroll(t *testing.T) { ) require.NoError(t, err) - err = cmd.Execute() + err = cmd.Execute(context.Background()) require.NoError(t, err) config, err := readConfig(store.Content) @@ -205,7 +206,7 @@ func TestEnroll(t *testing.T) { ) require.NoError(t, err) - err = cmd.Execute() + err = cmd.Execute(context.Background()) require.NoError(t, err) require.True(t, store.Called) @@ -265,7 +266,7 @@ func TestEnroll(t *testing.T) { ) require.NoError(t, err) - err = cmd.Execute() + err = cmd.Execute(context.Background()) require.NoError(t, err) require.True(t, store.Called) @@ -310,7 +311,7 @@ func TestEnroll(t *testing.T) { ) require.NoError(t, err) - err = cmd.Execute() + err = cmd.Execute(context.Background()) require.Error(t, err) require.False(t, store.Called) }, diff --git a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go index 0ec71d7a5fa..225c50e65e6 100644 --- a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go +++ b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go @@ -6,9 +6,12 @@ package application import ( "context" + "fmt" "sync" "time" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" + "github.com/elastic/beats/v7/libbeat/common/backoff" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" @@ -57,10 +60,18 @@ type fleetAcker interface { Commit(ctx context.Context) error } -// fleetGateway is a gateway between the Agent and the Fleet API, it's take cares of all the +// FleetGateway is a gateway between the Agent and the Fleet API, it's take cares of all the // bidirectional communication requirements. The gateway aggregates events and will periodically // call the API to send the events and will receive actions to be executed locally. // The only supported action for now is a "ActionPolicyChange". +type FleetGateway interface { + // Start starts the gateway. + Start() error + + // Set the client for the gateway. + SetClient(clienter) +} + type fleetGateway struct { bgContext context.Context log *logger.Logger @@ -90,7 +101,7 @@ func newFleetGateway( acker fleetAcker, statusController status.Controller, stateStore *stateStore, -) (*fleetGateway, error) { +) (FleetGateway, error) { scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter) return newFleetGatewayWithScheduler( @@ -120,7 +131,7 @@ func newFleetGatewayWithScheduler( acker fleetAcker, statusController status.Controller, stateStore *stateStore, -) (*fleetGateway, error) { +) (FleetGateway, error) { // Backoff implementation doesn't support the using context as the shutdown mechanism. // So we keep a done channel that will be closed when the current context is shutdown. @@ -142,7 +153,7 @@ func newFleetGatewayWithScheduler( done: done, reporter: r, acker: acker, - statusReporter: statusController.Register("gateway"), + statusReporter: statusController.RegisterComponent("gateway"), statusController: statusController, stateStore: stateStore, }, nil @@ -160,7 +171,7 @@ func (f *fleetGateway) worker() { resp, err := f.doExecute() if err != nil { f.log.Error(err) - f.statusReporter.Update(status.Failed) + f.statusReporter.Update(state.Failed, err.Error()) continue } @@ -170,12 +181,13 @@ func (f *fleetGateway) worker() { } if err := f.dispatcher.Dispatch(f.acker, actions...); err != nil { - f.log.Errorf("failed to dispatch actions, error: %s", err) - f.statusReporter.Update(status.Degraded) + msg := fmt.Sprintf("failed to dispatch actions, error: %s", err) + f.log.Error(msg) + f.statusReporter.Update(state.Degraded, msg) } f.log.Debugf("FleetGateway is sleeping, next update in %s", f.settings.Duration) - f.statusReporter.Update(status.Healthy) + f.statusReporter.Update(state.Healthy, "") case <-f.bgContext.Done(): f.stop() return @@ -270,7 +282,7 @@ func isUnauth(err error) bool { return errors.Is(err, fleetapi.ErrInvalidAPIKey) } -func (f *fleetGateway) Start() { +func (f *fleetGateway) Start() error { f.wg.Add(1) go func(wg *sync.WaitGroup) { defer f.log.Info("Fleet gateway is stopped") @@ -278,6 +290,7 @@ func (f *fleetGateway) Start() { f.worker() }(&f.wg) + return nil } func (f *fleetGateway) stop() { diff --git a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway_local.go b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway_local.go new file mode 100644 index 00000000000..e25e7792fb1 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway_local.go @@ -0,0 +1,109 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "context" + "time" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" +) + +const gatewayWait = 2 * time.Second + +var injectFleetServerInput = map[string]interface{}{ + // outputs is replaced by the fleet-server.spec + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "hosts": []string{"localhost:9200"}, + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "fleet-server", + }, + }, +} + +// fleetServerWrapper wraps the fleetGateway to ensure that a local Fleet Server is running before trying +// to communicate with the gateway, which is local to the Elastic Agent. +type fleetServerWrapper struct { + bgContext context.Context + log *logger.Logger + cfg *configuration.FleetAgentConfig + injectedCfg *config.Config + wrapped FleetGateway + emitter emitterFunc +} + +func wrapLocalFleetServer( + ctx context.Context, + log *logger.Logger, + cfg *configuration.FleetAgentConfig, + rawConfig *config.Config, + wrapped FleetGateway, + emitter emitterFunc) (FleetGateway, error) { + if cfg.Server == nil { + // not running a local Fleet Server + return wrapped, nil + } + injectedCfg, err := injectFleetServer(rawConfig) + if err != nil { + return nil, errors.New(err, "failed to inject fleet-server input to start local Fleet Server", errors.TypeConfig) + } + return &fleetServerWrapper{ + bgContext: ctx, + log: log, + cfg: cfg, + injectedCfg: injectedCfg, + wrapped: wrapped, + emitter: emitter, + }, nil +} + +// Start starts the gateway. +func (w *fleetServerWrapper) Start() error { + err := w.emitter(w.injectedCfg) + if err != nil { + return err + } + sleep(w.bgContext, gatewayWait) + return w.wrapped.Start() +} + +// SetClient sets the client for the wrapped gateway. +func (w *fleetServerWrapper) SetClient(client clienter) { + w.wrapped.SetClient(client) +} + +func injectFleetServer(rawConfig *config.Config) (*config.Config, error) { + cfg := map[string]interface{}{} + err := rawConfig.Unpack(cfg) + if err != nil { + return nil, err + } + cloned, err := config.NewConfigFrom(cfg) + if err != nil { + return nil, err + } + err = cloned.Merge(injectFleetServerInput) + if err != nil { + return nil, err + } + return cloned, nil +} + +func sleep(ctx context.Context, d time.Duration) { + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + case <-t.C: + } +} diff --git a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway_test.go b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway_test.go index a31f6a343a2..4af4836936e 100644 --- a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway_test.go @@ -105,7 +105,7 @@ func newTestingDispatcher() *testingDispatcher { return &testingDispatcher{received: make(chan struct{}, 1)} } -type withGatewayFunc func(*testing.T, *fleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper, repo.Backend) +type withGatewayFunc func(*testing.T, FleetGateway, *testingClient, *testingDispatcher, *scheduler.Stepper, repo.Backend) func withGateway(agentInfo agentInfo, settings *fleetGatewaySettings, fn withGatewayFunc) func(t *testing.T) { return func(t *testing.T) { @@ -172,7 +172,7 @@ func TestFleetGateway(t *testing.T) { t.Run("send no event and receive no action", withGateway(agentInfo, settings, func( t *testing.T, - gateway *fleetGateway, + gateway FleetGateway, client *testingClient, dispatcher *testingDispatcher, scheduler *scheduler.Stepper, @@ -197,7 +197,7 @@ func TestFleetGateway(t *testing.T) { t.Run("Successfully connects and receives a series of actions", withGateway(agentInfo, settings, func( t *testing.T, - gateway *fleetGateway, + gateway FleetGateway, client *testingClient, dispatcher *testingDispatcher, scheduler *scheduler.Stepper, @@ -292,7 +292,7 @@ func TestFleetGateway(t *testing.T) { t.Run("send event and receive no action", withGateway(agentInfo, settings, func( t *testing.T, - gateway *fleetGateway, + gateway FleetGateway, client *testingClient, dispatcher *testingDispatcher, scheduler *scheduler.Stepper, @@ -404,7 +404,7 @@ func TestRetriesOnFailures(t *testing.T) { t.Run("When the gateway fails to communicate with the checkin API we will retry", withGateway(agentInfo, settings, func( t *testing.T, - gateway *fleetGateway, + gateway FleetGateway, client *testingClient, dispatcher *testingDispatcher, scheduler *scheduler.Stepper, @@ -460,7 +460,7 @@ func TestRetriesOnFailures(t *testing.T) { Backoff: backoffSettings{Init: 10 * time.Minute, Max: 20 * time.Minute}, }, func( t *testing.T, - gateway *fleetGateway, + gateway FleetGateway, client *testingClient, dispatcher *testingDispatcher, scheduler *scheduler.Stepper, diff --git a/x-pack/elastic-agent/pkg/agent/application/fleet_server_bootstrap.go b/x-pack/elastic-agent/pkg/agent/application/fleet_server_bootstrap.go new file mode 100644 index 00000000000..ebdb65ff706 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/application/fleet_server_bootstrap.go @@ -0,0 +1,213 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "context" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" + "github.com/elastic/go-sysinfo" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" + reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter" + logreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/log" +) + +// FleetServerBootstrap application, does just enough to get a Fleet Server up and running so enrollment +// can complete. +type FleetServerBootstrap struct { + bgContext context.Context + cancelCtxFn context.CancelFunc + log *logger.Logger + Config configuration.FleetAgentConfig + agentInfo *info.AgentInfo + router *router + source source + srv *server.Server +} + +func newFleetServerBootstrap( + ctx context.Context, + log *logger.Logger, + pathConfigFile string, + rawConfig *config.Config, + statusCtrl status.Controller, + agentInfo *info.AgentInfo, +) (*FleetServerBootstrap, error) { + cfg, err := configuration.NewFromConfig(rawConfig) + if err != nil { + return nil, err + } + + if log == nil { + log, err = logger.NewFromConfig("", cfg.Settings.LoggingConfig) + if err != nil { + return nil, err + } + } + + logR := logreporter.NewReporter(log) + + sysInfo, err := sysinfo.Host() + if err != nil { + return nil, errors.New(err, + "fail to get system information", + errors.TypeUnexpected) + } + + bootstrapApp := &FleetServerBootstrap{ + log: log, + agentInfo: agentInfo, + } + + bootstrapApp.bgContext, bootstrapApp.cancelCtxFn = context.WithCancel(ctx) + bootstrapApp.srv, err = server.NewFromConfig(log, cfg.Settings.GRPC, &operation.ApplicationStatusHandler{}) + if err != nil { + return nil, errors.New(err, "initialize GRPC listener") + } + + reporter := reporting.NewReporter(bootstrapApp.bgContext, log, bootstrapApp.agentInfo, logR) + + monitor, err := monitoring.NewMonitor(cfg.Settings) + if err != nil { + return nil, errors.New(err, "failed to initialize monitoring") + } + + router, err := newRouter(log, streamFactory(bootstrapApp.bgContext, agentInfo, cfg.Settings, bootstrapApp.srv, reporter, monitor, statusCtrl)) + if err != nil { + return nil, errors.New(err, "fail to initialize pipeline router") + } + bootstrapApp.router = router + + emit, err := bootstrapEmitter( + bootstrapApp.bgContext, + log, + agentInfo, + router, + &configModifiers{ + Filters: []filterFunc{filters.StreamChecker, injectFleet(rawConfig, sysInfo.Info(), agentInfo)}, + }, + ) + if err != nil { + return nil, err + } + + discover := discoverer(pathConfigFile, cfg.Settings.Path) + bootstrapApp.source = newOnce(log, discover, emit) + return bootstrapApp, nil +} + +// Start starts a managed elastic-agent. +func (b *FleetServerBootstrap) Start() error { + b.log.Info("Agent is starting") + defer b.log.Info("Agent is stopped") + + if err := b.srv.Start(); err != nil { + return err + } + if err := b.source.Start(); err != nil { + return err + } + + return nil +} + +// Stop stops a local agent. +func (b *FleetServerBootstrap) Stop() error { + err := b.source.Stop() + b.cancelCtxFn() + b.router.Shutdown() + b.srv.Stop() + return err +} + +// AgentInfo retrieves elastic-agent information. +func (b *FleetServerBootstrap) AgentInfo() *info.AgentInfo { + return b.agentInfo +} + +func bootstrapEmitter(ctx context.Context, log *logger.Logger, agentInfo transpiler.AgentInfo, router programsDispatcher, modifiers *configModifiers) (emitterFunc, error) { + ch := make(chan *config.Config) + + go func() { + for { + var c *config.Config + select { + case <-ctx.Done(): + return + case c = <-ch: + } + + err := emit(log, agentInfo, router, modifiers, c) + if err != nil { + log.Error(err) + } + } + }() + + return func(c *config.Config) error { + ch <- c + return nil + }, nil +} + +func emit(log *logger.Logger, agentInfo transpiler.AgentInfo, router programsDispatcher, modifiers *configModifiers, c *config.Config) error { + if err := InjectAgentConfig(c); err != nil { + return err + } + + // perform and verify ast translation + m, err := c.ToMapStr() + if err != nil { + return errors.New(err, "could not create the AST from the configuration", errors.TypeConfig) + } + ast, err := transpiler.NewAST(m) + if err != nil { + return errors.New(err, "could not create the AST from the configuration", errors.TypeConfig) + } + for _, filter := range modifiers.Filters { + if err := filter(log, ast); err != nil { + return errors.New(err, "failed to filter configuration", errors.TypeConfig) + } + } + + // overwrite the inputs to only have a single fleet-server input + transpiler.Insert(ast, transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("fleet-server")), + }), + }), "inputs") + + spec, ok := program.SupportedMap["fleet-server"] + if !ok { + return errors.New("missing required fleet-server program specification") + } + ok, err = program.DetectProgram(spec, agentInfo, ast) + if err != nil { + return errors.New(err, "failed parsing the configuration") + } + if !ok { + return errors.New("bootstrap configuration is incorrect causing fleet-server to not be started") + } + + return router.Dispatch(ast.HashStr(), map[routingKey][]program.Program{ + defautlRK: { + { + Spec: spec, + Config: ast, + }, + }, + }) +} diff --git a/x-pack/elastic-agent/pkg/agent/application/local_mode.go b/x-pack/elastic-agent/pkg/agent/application/local_mode.go index e805c7423aa..2c1b33f83ba 100644 --- a/x-pack/elastic-agent/pkg/agent/application/local_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/local_mode.go @@ -20,6 +20,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/dir" reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter" logreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/log" @@ -63,11 +64,11 @@ func newLocal( pathConfigFile string, rawConfig *config.Config, reexec reexecManager, + statusCtrl status.Controller, uc upgraderControl, agentInfo *info.AgentInfo, ) (*Local, error) { - statusController := &noopController{} - caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, statusController) + caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, statusCtrl) if err != nil { return nil, err } @@ -104,7 +105,7 @@ func newLocal( return nil, errors.New(err, "failed to initialize monitoring") } - router, err := newRouter(log, streamFactory(localApplication.bgContext, agentInfo, cfg.Settings, localApplication.srv, reporter, monitor, statusController)) + router, err := newRouter(log, streamFactory(localApplication.bgContext, agentInfo, cfg.Settings, localApplication.srv, reporter, monitor, statusCtrl)) if err != nil { return nil, errors.New(err, "fail to initialize pipeline router") } diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index c5c6d542d8f..c12c2451e3a 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -52,7 +52,7 @@ type Managed struct { Config configuration.FleetAgentConfig api apiClient agentInfo *info.AgentInfo - gateway *fleetGateway + gateway FleetGateway router *router srv *server.Server stateStore *stateStore @@ -62,58 +62,18 @@ type Managed struct { func newManaged( ctx context.Context, log *logger.Logger, + store storage.Store, + cfg *configuration.Configuration, rawConfig *config.Config, reexec reexecManager, + statusCtrl status.Controller, agentInfo *info.AgentInfo, ) (*Managed, error) { - statusController := status.NewController(log) - caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, statusController) + caps, err := capabilities.Load(info.AgentCapabilitiesPath(), log, statusCtrl) if err != nil { return nil, err } - path := info.AgentConfigFile() - - store := storage.NewDiskStore(path) - reader, err := store.Load() - if err != nil { - return nil, errors.New(err, "could not initialize config store", - errors.TypeFilesystem, - errors.M(errors.MetaKeyPath, path)) - } - - config, err := config.NewConfigFrom(reader) - if err != nil { - return nil, errors.New(err, - fmt.Sprintf("fail to read configuration %s for the elastic-agent", path), - errors.TypeFilesystem, - errors.M(errors.MetaKeyPath, path)) - } - - // merge local configuration and configuration persisted from fleet. - err = rawConfig.Merge(config) - if err != nil { - return nil, errors.New(err, - fmt.Sprintf("fail to merge configuration with %s for the elastic-agent", path), - errors.TypeConfig, - errors.M(errors.MetaKeyPath, path)) - } - - cfg, err := configuration.NewFromConfig(rawConfig) - if err != nil { - return nil, errors.New(err, - fmt.Sprintf("fail to unpack configuration from %s", path), - errors.TypeFilesystem, - errors.M(errors.MetaKeyPath, path)) - } - - if err := cfg.Fleet.Valid(); err != nil { - return nil, errors.New(err, - "fleet configuration is invalid", - errors.TypeFilesystem, - errors.M(errors.MetaKeyPath, path)) - } - client, err := fleetapi.NewAuthWithConfig(log, cfg.Fleet.AccessAPIKey, cfg.Fleet.Kibana) if err != nil { return nil, errors.New(err, @@ -158,7 +118,7 @@ func newManaged( return nil, errors.New(err, "failed to initialize monitoring") } - router, err := newRouter(log, streamFactory(managedApplication.bgContext, agentInfo, cfg.Settings, managedApplication.srv, combinedReporter, monitor, statusController)) + router, err := newRouter(log, streamFactory(managedApplication.bgContext, agentInfo, cfg.Settings, managedApplication.srv, combinedReporter, monitor, statusCtrl)) if err != nil { return nil, errors.New(err, "fail to initialize pipeline router") } @@ -177,7 +137,7 @@ func newManaged( router, &configModifiers{ Decorators: []decoratorFunc{injectMonitoring}, - Filters: []filterFunc{filters.StreamChecker, injectFleet(config, sysInfo.Info(), agentInfo)}, + Filters: []filterFunc{filters.StreamChecker, injectFleet(rawConfig, sysInfo.Info(), agentInfo)}, }, caps, monitor, @@ -287,12 +247,16 @@ func newManaged( actionDispatcher, fleetR, actionAcker, - statusController, + statusCtrl, stateStore, ) if err != nil { return nil, err } + gateway, err = wrapLocalFleetServer(managedApplication.bgContext, log, cfg.Fleet, rawConfig, gateway, emit) + if err != nil { + return nil, err + } // add the gateway to setters, so the gateway can be updated // when the hosts for Kibana are updated by the policy. policyChanger.setters = append(policyChanger.setters, gateway) @@ -309,11 +273,15 @@ func (m *Managed) Start() error { return nil } - if err := m.upgrader.Ack(m.bgContext); err != nil { + err := m.upgrader.Ack(m.bgContext) + if err != nil { m.log.Warnf("failed to ack update %v", err) } - m.gateway.Start() + err = m.gateway.Start() + if err != nil { + return err + } return nil } diff --git a/x-pack/elastic-agent/pkg/agent/application/noop_status_controller.go b/x-pack/elastic-agent/pkg/agent/application/noop_status_controller.go index 1c55fda08e7..b229f3cff08 100644 --- a/x-pack/elastic-agent/pkg/agent/application/noop_status_controller.go +++ b/x-pack/elastic-agent/pkg/agent/application/noop_status_controller.go @@ -5,20 +5,23 @@ package application import ( + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" ) type noopController struct{} -func (*noopController) Register(_ string) status.Reporter { return &noopReporter{} } -func (*noopController) RegisterWithPersistance(_ string, _ bool) status.Reporter { +func (*noopController) RegisterComponent(_ string) status.Reporter { return &noopReporter{} } +func (*noopController) RegisterComponentWithPersistance(_ string, _ bool) status.Reporter { return &noopReporter{} } -func (*noopController) Status() status.AgentStatus { return status.Healthy } -func (*noopController) UpdateStateID(_ string) {} -func (*noopController) StatusString() string { return "online" } +func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} } +func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} } +func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy } +func (*noopController) UpdateStateID(_ string) {} +func (*noopController) StatusString() string { return "online" } type noopReporter struct{} -func (*noopReporter) Update(status.AgentStatus) {} -func (*noopReporter) Unregister() {} +func (*noopReporter) Update(_ state.Status, _ string) {} +func (*noopReporter) Unregister() {} diff --git a/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go b/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go index 4e75d92fb1d..e23764b9056 100644 --- a/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go +++ b/x-pack/elastic-agent/pkg/agent/application/upgrade/upgrade.go @@ -220,7 +220,7 @@ func (u *Upgrader) ackAction(ctx context.Context, action fleetapi.Action) error u.reporter.OnStateChange( "", agentName, - state.State{Status: state.Running}, + state.State{Status: state.Healthy}, ) return nil diff --git a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go index 282ea2f4645..58c99306e71 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go @@ -7,34 +7,27 @@ package cmd import ( "context" "fmt" - "math/rand" "os" - "time" - - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + "os/signal" + "syscall" "github.com/spf13/cobra" - "github.com/elastic/beats/v7/libbeat/common/backoff" c "github.com/elastic/beats/v7/libbeat/common/cli" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/warn" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" ) -var defaultDelay = 1 * time.Second - func newEnrollCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStreams) *cobra.Command { cmd := &cobra.Command{ - Use: "enroll ", + Use: "enroll", Short: "Enroll the Agent into Fleet", Long: "This will enroll the Agent into Fleet.", - Args: cobra.ExactArgs(2), Run: func(c *cobra.Command, args []string) { if err := enroll(streams, c, flags, args); err != nil { fmt.Fprintf(streams.Err, "Error: %v\n", err) @@ -55,19 +48,51 @@ func newEnrollCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStr } func addEnrollFlags(cmd *cobra.Command) { + cmd.Flags().StringP("url", "", "", "URL to enroll Agent into Fleet") + cmd.Flags().StringP("kibana-url", "k", "", "URL of Kibana to enroll Agent into Fleet") + cmd.Flags().StringP("enrollment-token", "t", "", "Enrollment token to use to enroll Agent into Fleet") + cmd.Flags().StringP("fleet-server", "", "", "Start and run a Fleet Server along side this Elastic Agent") + cmd.Flags().StringP("fleet-server-policy", "", "", "Start and run a Fleet Server on this specific policy") cmd.Flags().StringP("certificate-authorities", "a", "", "Comma separated list of root certificate for server verifications") cmd.Flags().StringP("ca-sha256", "p", "", "Comma separated list of certificate authorities hash pins used for certificate verifications") cmd.Flags().BoolP("insecure", "i", false, "Allow insecure connection to Kibana") cmd.Flags().StringP("staging", "", "", "Configures agent to download artifacts from a staging build") } -func buildEnrollmentFlags(cmd *cobra.Command) []string { +func buildEnrollmentFlags(cmd *cobra.Command, url string, token string) []string { + if url == "" { + url, _ = cmd.Flags().GetString("url") + } + if url == "" { + url, _ = cmd.Flags().GetString("kibana-url") + } + if token == "" { + token, _ = cmd.Flags().GetString("enrollment-token") + } + fServer, _ := cmd.Flags().GetString("fleet-server") + fPolicy, _ := cmd.Flags().GetString("fleet-server-policy") ca, _ := cmd.Flags().GetString("certificate-authorities") sha256, _ := cmd.Flags().GetString("ca-sha256") insecure, _ := cmd.Flags().GetBool("insecure") staging, _ := cmd.Flags().GetString("staging") args := []string{} + if url != "" { + args = append(args, "--url") + args = append(args, url) + } + if token != "" { + args = append(args, "--enrollment-token") + args = append(args, token) + } + if fServer != "" { + args = append(args, "--fleet-server") + args = append(args, fServer) + } + if fPolicy != "" { + args = append(args, "--fleet-server-policy") + args = append(args, fPolicy) + } if ca != "" { args = append(args, "--certificate-authorities") args = append(args, ca) @@ -116,9 +141,11 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args } } + noRestart, _ := cmd.Flags().GetBool("no-restart") force, _ := cmd.Flags().GetBool("force") if fromInstall { force = true + noRestart = true } // prompt only when it is not forced and is already enrolled @@ -133,23 +160,26 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args } } - insecure, _ := cmd.Flags().GetBool("insecure") - logger, err := logger.NewFromConfig("", cfg.Settings.LoggingConfig) if err != nil { return err } - url := args[0] - enrollmentToken := args[1] + insecure, _ := cmd.Flags().GetBool("insecure") + url, _ := cmd.Flags().GetString("url") + if url == "" { + url, _ = cmd.Flags().GetString("kibana-url") + } + enrollmentToken, _ := cmd.Flags().GetString("enrollment-token") + fServer, _ := cmd.Flags().GetString("fleet-server") + fPolicy, _ := cmd.Flags().GetString("fleet-server-policy") caStr, _ := cmd.Flags().GetString("certificate-authorities") CAs := cli.StringToSlice(caStr) - caSHA256str, _ := cmd.Flags().GetString("ca-sha256") caSHA256 := cli.StringToSlice(caSHA256str) - delay(defaultDelay) + ctx := handleSignal(context.Background()) options := application.EnrollCmdOption{ ID: "", // TODO(ph), This should not be an empty string, will clarify in a new PR. @@ -160,6 +190,9 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args Insecure: insecure, UserProvidedMetadata: make(map[string]interface{}), Staging: staging, + FleetServerConnStr: fServer, + FleetServerPolicyID: fPolicy, + NoRestart: noRestart, } c, err := application.NewEnrollCmd( @@ -172,46 +205,29 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args return err } - err = c.Execute() - signal := make(chan struct{}) - - backExp := backoff.NewExpBackoff(signal, 60*time.Second, 10*time.Minute) - - for errors.Is(err, fleetapi.ErrTooManyRequests) { - fmt.Fprintln(streams.Out, "Too many requests on the remote server, will retry in a moment.") - backExp.Wait() - fmt.Fprintln(streams.Out, "Retrying to enroll...") - err = c.Execute() - } - - close(signal) - - if err != nil { - return errors.New(err, "fail to enroll") + err = c.Execute(ctx) + if err == nil { + fmt.Fprintln(streams.Out, "Successfully enrolled the Elastic Agent.") } + return err +} - fmt.Fprintln(streams.Out, "Successfully enrolled the Elastic Agent.") +func handleSignal(ctx context.Context) context.Context { + ctx, cfunc := context.WithCancel(ctx) - // skip restarting - noRestart, _ := cmd.Flags().GetBool("no-restart") - if noRestart || fromInstall { - return nil - } + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT) - daemon := client.New() - err = daemon.Connect(context.Background()) - if err == nil { - defer daemon.Disconnect() - err = daemon.Restart(context.Background()) - if err == nil { - fmt.Fprintln(streams.Out, "Successfully triggered restart on running Elastic Agent.") - return nil + go func() { + select { + case <-sigs: + cfunc() + case <-ctx.Done(): } - } - fmt.Fprintln(streams.Out, "Elastic Agent might not be running; unable to trigger restart") - return nil -} -func delay(t time.Duration) { - <-time.After(time.Duration(rand.Int63n(int64(t)))) + signal.Stop(sigs) + close(sigs) + }() + + return ctx } diff --git a/x-pack/elastic-agent/pkg/agent/cmd/install.go b/x-pack/elastic-agent/pkg/agent/cmd/install.go index 7fd5b23ea18..d978cd72d78 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/install.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/install.go @@ -37,8 +37,6 @@ would like the Agent to operate. }, } - cmd.Flags().StringP("kibana-url", "k", "", "URL of Kibana to enroll Agent into Fleet") - cmd.Flags().StringP("enrollment-token", "t", "", "Enrollment token to use to enroll Agent into Fleet") cmd.Flags().BoolP("force", "f", false, "Force overwrite the current and do not prompt for confirmation") addEnrollFlags(cmd) @@ -93,9 +91,12 @@ func installCmd(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, enroll := true askEnroll := true - kibana, _ := cmd.Flags().GetString("kibana-url") + url, _ := cmd.Flags().GetString("url") + if url == "" { + url, _ = cmd.Flags().GetString("kibana-url") + } token, _ := cmd.Flags().GetString("enrollment-token") - if kibana != "" && token != "" { + if url != "" && token != "" { askEnroll = false } if force { @@ -111,18 +112,18 @@ func installCmd(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, enroll = false } } - if !askEnroll && (kibana == "" || token == "") { + if !askEnroll && (url == "" || token == "") { // force was performed without required enrollment arguments, all done (standalone mode) enroll = false } if enroll { - if kibana == "" { - kibana, err = c.ReadInput("Kibana URL you want to enroll this Agent into:") + if url == "" { + url, err = c.ReadInput("URL you want to enroll this Agent into:") if err != nil { return fmt.Errorf("problem reading prompt response") } - if kibana == "" { + if url == "" { fmt.Fprintf(streams.Out, "Enrollment cancelled because no URL was provided.\n") return nil } @@ -144,16 +145,33 @@ func installCmd(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, return err } + defer func() { + if err != nil { + install.Uninstall() + } + }() + + err = install.StartService() + if err != nil { + fmt.Fprintf(streams.Out, "Installation failed to start Elastic Agent service.\n") + return err + } + + defer func() { + if err != nil { + install.StopService() + } + }() + if enroll { - enrollArgs := []string{"enroll", kibana, token, "--from-install"} - enrollArgs = append(enrollArgs, buildEnrollmentFlags(cmd)...) + enrollArgs := []string{"enroll", "--from-install"} + enrollArgs = append(enrollArgs, buildEnrollmentFlags(cmd, url, token)...) enrollCmd := exec.Command(install.ExecutablePath(), enrollArgs...) enrollCmd.Stdin = os.Stdin enrollCmd.Stdout = os.Stdout enrollCmd.Stderr = os.Stderr err = enrollCmd.Start() if err != nil { - install.Uninstall() return fmt.Errorf("failed to execute enroll command: %s", err) } err = enrollCmd.Wait() @@ -167,11 +185,5 @@ func installCmd(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, } } - err = install.StartService() - if err != nil { - fmt.Fprintf(streams.Out, "Installation of required system files was successful, but starting of the service failed.\n") - return err - } - fmt.Fprintf(streams.Out, "Installation was successful and Elastic Agent is running.\n") return nil } diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index ecfac4ff740..8d4157fdadd 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -16,6 +16,8 @@ import ( "strings" "syscall" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" + "github.com/spf13/cobra" "github.com/elastic/beats/v7/libbeat/api" @@ -129,14 +131,16 @@ func run(flags *globalFlags, streams *cli.IOStreams) error { // Windows: Mark se rexLogger := logger.Named("reexec") rex := reexec.NewManager(rexLogger, execPath) + statusCtrl := status.NewController(logger) + // start the control listener - control := server.New(logger.Named("control"), rex, nil) + control := server.New(logger.Named("control"), rex, statusCtrl, nil) if err := control.Start(); err != nil { return err } defer control.Stop() - app, err := application.New(logger, pathConfigFile, rex, control, agentInfo) + app, err := application.New(logger, pathConfigFile, rex, statusCtrl, control, agentInfo) if err != nil { return err } diff --git a/x-pack/elastic-agent/pkg/agent/configuration/fleet.go b/x-pack/elastic-agent/pkg/agent/configuration/fleet.go index c8315b81cf0..af60651a362 100644 --- a/x-pack/elastic-agent/pkg/agent/configuration/fleet.go +++ b/x-pack/elastic-agent/pkg/agent/configuration/fleet.go @@ -18,11 +18,17 @@ type FleetAgentConfig struct { Kibana *kibana.Config `config:"kibana" yaml:"kibana"` Reporting *fleetreporter.Config `config:"reporting" yaml:"reporting"` Info *AgentInfo `config:"agent" yaml:"agent"` + Server *FleetServerConfig `config:"server" yaml:"server,omitempty"` } // Valid validates the required fields for accessing the API. func (e *FleetAgentConfig) Valid() error { if e.Enabled { + if e.Server != nil && e.Server.Bootstrap { + // bootstrapping Fleet Server, checks below can be ignored + return nil + } + if len(e.AccessAPIKey) == 0 { return errors.New("empty access token", errors.TypeConfig) } diff --git a/x-pack/elastic-agent/pkg/agent/configuration/fleet_server.go b/x-pack/elastic-agent/pkg/agent/configuration/fleet_server.go new file mode 100644 index 00000000000..3ff7ad91b2e --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/configuration/fleet_server.go @@ -0,0 +1,68 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package configuration + +import ( + "net/url" + + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" +) + +// FleetServerConfig is the configuration written so Elastic Agent can run Fleet Server. +type FleetServerConfig struct { + Bootstrap bool `config:"bootstrap" yaml:"bootstrap,omitempty"` + Policy *FleetServerPolicyConfig `config:"policy" yaml:"policy,omitempty"` + Output FleetServerOutputConfig `config:"output" yaml:"output,omitempty"` +} + +// FleetServerPolicyConfig is the configuration for the policy Fleet Server should run on. +type FleetServerPolicyConfig struct { + ID string `config:"id"` +} + +// FleetServerOutputConfig is the connection for Fleet Server to call to Elasticsearch. +type FleetServerOutputConfig struct { + Elasticsearch Elasticsearch `config:"elasticsearch" yaml:"elasticsearch"` +} + +// Elasticsearch is the configuration for elasticsearch. +type Elasticsearch struct { + Protocol string `config:"protocol" yaml:"protocol"` + Hosts []string `config:"hosts" yaml:"hosts"` + Path string `config:"path" yaml:"path,omitempty"` + Username string `config:"username" yaml:"username"` + Password string `config:"password" yaml:"password"` + TLS *tlscommon.Config `config:"ssl" yaml:"ssl,omitempty"` +} + +// ElasticsearchFromConnStr returns an Elasticsearch configuration from the connection string. +func ElasticsearchFromConnStr(conn string) (Elasticsearch, error) { + u, err := url.Parse(conn) + if err != nil { + return Elasticsearch{}, err + } + if u.Scheme != "http" && u.Scheme != "https" { + return Elasticsearch{}, errors.New("invalid connection string: scheme must be http or https") + } + if u.Host == "" { + return Elasticsearch{}, errors.New("invalid connection string: must include a host") + } + if u.User == nil || u.User.Username() == "" { + return Elasticsearch{}, errors.New("invalid connection string: must include a username") + } + password, ok := u.User.Password() + if !ok { + return Elasticsearch{}, errors.New("invalid connection string: must include a password") + } + return Elasticsearch{ + Protocol: u.Scheme, + Hosts: []string{u.Host}, + Path: u.Path, + Username: u.User.Username(), + Password: password, + TLS: nil, + }, nil +} diff --git a/x-pack/elastic-agent/pkg/agent/control/control_test.go b/x-pack/elastic-agent/pkg/agent/control/control_test.go index 5c56aed4691..bcda4a0e4ed 100644 --- a/x-pack/elastic-agent/pkg/agent/control/control_test.go +++ b/x-pack/elastic-agent/pkg/agent/control/control_test.go @@ -8,6 +8,8 @@ import ( "context" "testing" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -20,7 +22,7 @@ import ( ) func TestServerClient_Version(t *testing.T) { - srv := server.New(newErrorLogger(t), nil, nil) + srv := server.New(newErrorLogger(t), nil, nil, nil) err := srv.Start() require.NoError(t, err) defer srv.Stop() @@ -41,6 +43,29 @@ func TestServerClient_Version(t *testing.T) { }, ver) } +func TestServerClient_Status(t *testing.T) { + l := newErrorLogger(t) + statusCtrl := status.NewController(l) + srv := server.New(l, nil, statusCtrl, nil) + err := srv.Start() + require.NoError(t, err) + defer srv.Stop() + + c := client.New() + err = c.Connect(context.Background()) + require.NoError(t, err) + defer c.Disconnect() + + status, err := c.Status(context.Background()) + require.NoError(t, err) + + assert.Equal(t, &client.AgentStatus{ + Status: client.Healthy, + Message: "", + Applications: []*client.ApplicationStatus{}, + }, status) +} + func newErrorLogger(t *testing.T) *logger.Logger { t.Helper() diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go index 0ce970c9256..edd96efdad6 100644 --- a/x-pack/elastic-agent/pkg/agent/control/server/server.go +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -17,26 +17,29 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) // Server is the daemon side of the control protocol. type Server struct { - logger *logger.Logger - rex reexec.ExecManager - up *upgrade.Upgrader - listener net.Listener - server *grpc.Server - lock sync.RWMutex + logger *logger.Logger + rex reexec.ExecManager + statusCtrl status.Controller + up *upgrade.Upgrader + listener net.Listener + server *grpc.Server + lock sync.RWMutex } // New creates a new control protocol server. -func New(log *logger.Logger, rex reexec.ExecManager, up *upgrade.Upgrader) *Server { +func New(log *logger.Logger, rex reexec.ExecManager, statusCtrl status.Controller, up *upgrade.Upgrader) *Server { return &Server{ - logger: log, - rex: rex, - up: up, + logger: log, + rex: rex, + statusCtrl: statusCtrl, + up: up, } } @@ -95,11 +98,11 @@ func (s *Server) Version(_ context.Context, _ *proto.Empty) (*proto.VersionRespo // Status returns the overall status of the agent. func (s *Server) Status(_ context.Context, _ *proto.Empty) (*proto.StatusResponse, error) { - // not implemented + status := s.statusCtrl.Status() return &proto.StatusResponse{ - Status: proto.Status_HEALTHY, - Message: "not implemented", - Applications: nil, + Status: agentStatusToProto(status.Status), + Message: status.Message, + Applications: agentAppStatusToProto(status.Applications), }, nil } @@ -158,3 +161,27 @@ func (r *upgradeRequest) FleetAction() *fleetapi.ActionUpgrade { // upgrade request not from Fleet return nil } + +func agentStatusToProto(code status.AgentStatusCode) proto.Status { + if code == status.Degraded { + return proto.Status_DEGRADED + } + if code == status.Failed { + return proto.Status_FAILED + } + return proto.Status_HEALTHY +} + +func agentAppStatusToProto(apps []status.AgentApplicationStatus) []*proto.ApplicationStatus { + s := make([]*proto.ApplicationStatus, len(apps)) + for i, a := range apps { + s[i] = &proto.ApplicationStatus{ + Id: a.ID, + Name: a.Name, + Status: proto.Status(a.Status.ToProto()), + Message: a.Message, + Payload: "", + } + } + return s +} diff --git a/x-pack/elastic-agent/pkg/agent/errors/error.go b/x-pack/elastic-agent/pkg/agent/errors/error.go index 7ce5c770349..00c139c93c8 100644 --- a/x-pack/elastic-agent/pkg/agent/errors/error.go +++ b/x-pack/elastic-agent/pkg/agent/errors/error.go @@ -54,6 +54,12 @@ func (e agentError) Unwrap() error { // Error returns a string consisting of a message and originating error. func (e agentError) Error() string { + if e.err == nil { + if e.msg != "" { + return e.msg + } + return "unknown error" + } if e.msg != "" { return errors.Wrap(e.err, e.msg).Error() } diff --git a/x-pack/elastic-agent/pkg/agent/errors/error_test.go b/x-pack/elastic-agent/pkg/agent/errors/error_test.go index faee302b8a0..161120daf4b 100644 --- a/x-pack/elastic-agent/pkg/agent/errors/error_test.go +++ b/x-pack/elastic-agent/pkg/agent/errors/error_test.go @@ -7,7 +7,6 @@ package errors import ( "fmt" "io" - "strings" "testing" "github.com/pkg/errors" @@ -146,48 +145,6 @@ func TestErrors(t *testing.T) { } } -func TestNoErrorNoMsg(t *testing.T) { - actualErr := New() - agentErr, ok := actualErr.(Error) - if !ok { - t.Error("expected Error") - return - } - - e := agentErr.Error() - if !strings.Contains(e, "error_test.go[") { - t.Errorf("Error does not contain source file: %v", e) - } - - if !strings.HasSuffix(e, ": unknown error") { - t.Errorf("Error does not contain default error: %v", e) - } -} - -func TestNoError(t *testing.T) { - // test with message - msg := "msg2" - actualErr := New(msg) - agentErr, ok := actualErr.(Error) - if !ok { - t.Error("expected Error") - return - } - - e := agentErr.Error() - if !strings.Contains(e, "error_test.go[") { - t.Errorf("Error does not contain source file: %v", e) - } - - if !strings.HasSuffix(e, ": unknown error") { - t.Errorf("Error does not contain default error: %v", e) - } - - if !strings.HasPrefix(e, msg) { - t.Errorf("Error does not contain provided message: %v", e) - } -} - func TestMetaFold(t *testing.T) { err1 := fmt.Errorf("level1") err2 := New("level2", err1, M("key1", "level2"), M("key2", "level2")) diff --git a/x-pack/elastic-agent/pkg/agent/errors/generators.go b/x-pack/elastic-agent/pkg/agent/errors/generators.go index 26a067f4ce8..ce9e1961d3b 100644 --- a/x-pack/elastic-agent/pkg/agent/errors/generators.go +++ b/x-pack/elastic-agent/pkg/agent/errors/generators.go @@ -4,13 +4,6 @@ package errors -import ( - "fmt" - "runtime" - - "github.com/pkg/errors" -) - // M creates a meta entry for an error func M(key string, val interface{}) MetaRecord { return MetaRecord{key: key, @@ -43,13 +36,5 @@ func New(args ...interface{}) error { } } - if agentErr.err == nil { - agentErr.err = errors.New("unknown error") - - if _, file, line, ok := runtime.Caller(1); ok { - agentErr.err = errors.Wrapf(agentErr.err, fmt.Sprintf("%s[%d]", file, line)) - } - } - return agentErr } diff --git a/x-pack/elastic-agent/pkg/agent/install/install.go b/x-pack/elastic-agent/pkg/agent/install/install.go index 01b9bd6f616..3e7df33ccd7 100644 --- a/x-pack/elastic-agent/pkg/agent/install/install.go +++ b/x-pack/elastic-agent/pkg/agent/install/install.go @@ -104,6 +104,22 @@ func StartService() error { return nil } +// StopService stops the installed service. +func StopService() error { + svc, err := newService() + if err != nil { + return err + } + err = svc.Stop() + if err != nil { + return errors.New( + err, + fmt.Sprintf("failed to stop service (%s)", ServiceName), + errors.M("service", ServiceName)) + } + return nil +} + // findDirectory returns the directory to copy into the installation location. // // This also verifies that the discovered directory is a valid directory for installation. diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index 49df45071ce..a95bfe5b165 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -103,7 +103,7 @@ func NewOperator( reporter: reporter, monitor: monitor, statusController: statusController, - statusReporter: statusController.Register("operator-" + pipelineID), + statusReporter: statusController.RegisterComponent("operator-" + pipelineID), } operator.initHandlerMap() @@ -142,7 +142,7 @@ func (o *Operator) Close() error { func (o *Operator) HandleConfig(cfg configrequest.Request) error { _, stateID, steps, ack, err := o.stateResolver.Resolve(cfg) if err != nil { - o.statusReporter.Update(status.Failed) + o.statusReporter.Update(state.Failed, err.Error()) return errors.New(err, errors.TypeConfig, fmt.Sprintf("operator: failed to resolve configuration %s, error: %v", cfg, err)) } o.statusController.UpdateStateID(stateID) @@ -151,8 +151,9 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) error { if strings.ToLower(step.ProgramSpec.Cmd) != strings.ToLower(monitoringName) { if _, isSupported := program.SupportedMap[strings.ToLower(step.ProgramSpec.Cmd)]; !isSupported { // mark failed, new config cannot be run - o.statusReporter.Update(status.Failed) - return errors.New(fmt.Sprintf("program '%s' is not supported", step.ProgramSpec.Cmd), + msg := fmt.Sprintf("program '%s' is not supported", step.ProgramSpec.Cmd) + o.statusReporter.Update(state.Failed, msg) + return errors.New(msg, errors.TypeApplication, errors.M(errors.MetaKeyAppName, step.ProgramSpec.Cmd)) } @@ -160,18 +161,20 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) error { handler, found := o.handlers[step.ID] if !found { - o.statusReporter.Update(status.Failed) - return errors.New(fmt.Sprintf("operator: received unexpected event '%s'", step.ID), errors.TypeConfig) + msg := fmt.Sprintf("operator: received unexpected event '%s'", step.ID) + o.statusReporter.Update(state.Failed, msg) + return errors.New(msg, errors.TypeConfig) } if err := handler(step); err != nil { - o.statusReporter.Update(status.Failed) - return errors.New(err, errors.TypeConfig, fmt.Sprintf("operator: failed to execute step %s, error: %v", step.ID, err)) + msg := fmt.Sprintf("operator: failed to execute step %s, error: %v", step.ID, err) + o.statusReporter.Update(state.Failed, msg) + return errors.New(err, errors.TypeConfig, msg) } } // Ack the resolver should state for next call. - o.statusReporter.Update(status.Healthy) + o.statusReporter.Update(state.Healthy, "") ack() return nil diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator_test.go b/x-pack/elastic-agent/pkg/agent/operation/operator_test.go index a7a3547fa88..8966ca9a516 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator_test.go @@ -79,7 +79,7 @@ func TestConfigurableRun(t *testing.T) { if !ok { return fmt.Errorf("no state for process") } - if item.Status != state.Running { + if item.Status != state.Healthy { return fmt.Errorf("process never went to running") } return nil @@ -112,7 +112,7 @@ func TestConfigurableRun(t *testing.T) { items := operator.State() item0, ok := items[p.ID()] - if !ok || item0.Status != state.Running { + if !ok || item0.Status != state.Healthy { t.Fatalf("Process no longer running after config %#v", items) } pid := item0.ProcessInfo.PID @@ -154,7 +154,7 @@ func TestConfigurableFailed(t *testing.T) { if !ok { return fmt.Errorf("no state for process") } - if item.Status != state.Running { + if item.Status != state.Healthy { return fmt.Errorf("process never went to running") } pid = item.ProcessInfo.PID @@ -194,7 +194,7 @@ func TestConfigurableFailed(t *testing.T) { if !ok { return fmt.Errorf("no state for process") } - if item.Status == state.Running { + if item.Status == state.Healthy { return fmt.Errorf("process never left running") } return nil @@ -229,7 +229,7 @@ func TestConfigurableFailed(t *testing.T) { if !ok { return fmt.Errorf("no state for process") } - if item.Status != state.Running { + if item.Status != state.Healthy { return fmt.Errorf("process never went to back to running") } return nil @@ -263,7 +263,7 @@ func TestConfigurableCrash(t *testing.T) { if !ok { return fmt.Errorf("no state for process") } - if item.Status != state.Running { + if item.Status != state.Healthy { return fmt.Errorf("process never went to running") } pid = item.ProcessInfo.PID @@ -294,7 +294,7 @@ func TestConfigurableCrash(t *testing.T) { if !ok { return fmt.Errorf("no state for process") } - if item.Status == state.Running { + if item.Status == state.Healthy { return fmt.Errorf("process never left running") } return nil @@ -330,7 +330,7 @@ func TestConfigurableCrash(t *testing.T) { if !ok { return fmt.Errorf("no state for process") } - if item.Status != state.Running { + if item.Status != state.Healthy { return fmt.Errorf("process never went to back to running") } return nil @@ -366,7 +366,7 @@ func TestConfigurableStartStop(t *testing.T) { if !ok { return fmt.Errorf("no state for process") } - if item.Status != state.Running { + if item.Status != state.Healthy { return fmt.Errorf("process never went to running") } return nil @@ -415,7 +415,7 @@ func TestConfigurableService(t *testing.T) { if !ok { return fmt.Errorf("no state for process") } - if item.Status != state.Running { + if item.Status != state.Healthy { return fmt.Errorf("process never went to running") } return nil @@ -448,7 +448,7 @@ func TestConfigurableService(t *testing.T) { items := operator.State() item0, ok := items[p.ID()] - if !ok || item0.Status != state.Running { + if !ok || item0.Status != state.Healthy { t.Fatalf("Process no longer running after config %#v", items) } diff --git a/x-pack/elastic-agent/pkg/agent/program/program.go b/x-pack/elastic-agent/pkg/agent/program/program.go index 87d46bf07a0..7917e1f1053 100644 --- a/x-pack/elastic-agent/pkg/agent/program/program.go +++ b/x-pack/elastic-agent/pkg/agent/program/program.go @@ -55,7 +55,7 @@ func Programs(agentInfo transpiler.AgentInfo, singleConfig *transpiler.AST) (map groupedPrograms := make(map[string][]Program) for k, config := range grouped { - programs, err := detectPrograms(agentInfo, config) + programs, err := DetectPrograms(agentInfo, config) if err != nil { return nil, errors.New(err, errors.TypeConfig, "fail to generate program configuration") } @@ -65,48 +65,18 @@ func Programs(agentInfo transpiler.AgentInfo, singleConfig *transpiler.AST) (map return groupedPrograms, nil } -func detectPrograms(agentInfo transpiler.AgentInfo, singleConfig *transpiler.AST) ([]Program, error) { +// DetectPrograms returns the list of programs detected from the provided configuration. +func DetectPrograms(agentInfo transpiler.AgentInfo, singleConfig *transpiler.AST) ([]Program, error) { programs := make([]Program, 0) for _, spec := range Supported { specificAST := singleConfig.Clone() - if len(spec.Constraints) > 0 { - constraints, err := eql.New(spec.Constraints) - if err != nil { - return nil, err - } - ok, err := constraints.Eval(specificAST) - if err != nil { - return nil, err - } - - if !ok { - continue - } - } - - err := spec.Rules.Apply(agentInfo, specificAST) - if err != nil { - return nil, err - } - - if len(spec.When) == 0 { - return nil, ErrMissingWhen - } - - expression, err := eql.New(spec.When) + ok, err := DetectProgram(spec, agentInfo, specificAST) if err != nil { return nil, err } - - ok, err := expression.Eval(specificAST) - if err != nil { - return nil, err - } - if !ok { continue } - program := Program{ Spec: spec, Config: specificAST, @@ -114,7 +84,42 @@ func detectPrograms(agentInfo transpiler.AgentInfo, singleConfig *transpiler.AST programs = append(programs, program) } return programs, nil +} + +// DetectProgram returns true or false if this program exists in the AST. +// +// Note `ast` is modified to match what the program expects. Should clone the AST before passing to +// this function if you want to still have the original. +func DetectProgram(spec Spec, info transpiler.AgentInfo, ast *transpiler.AST) (bool, error) { + if len(spec.Constraints) > 0 { + constraints, err := eql.New(spec.Constraints) + if err != nil { + return false, err + } + ok, err := constraints.Eval(ast) + if err != nil { + return false, err + } + if !ok { + return false, nil + } + } + + err := spec.Rules.Apply(info, ast) + if err != nil { + return false, err + } + + if len(spec.When) == 0 { + return false, ErrMissingWhen + } + + expression, err := eql.New(spec.When) + if err != nil { + return false, err + } + return expression.Eval(ast) } // KnownProgramNames returns a list of runnable programs by the elastic-agent. diff --git a/x-pack/elastic-agent/pkg/agent/program/supported.go b/x-pack/elastic-agent/pkg/agent/program/supported.go index 0e6b7d7d028..85522517c16 100644 --- a/x-pack/elastic-agent/pkg/agent/program/supported.go +++ b/x-pack/elastic-agent/pkg/agent/program/supported.go @@ -24,7 +24,7 @@ func init() { // spec/heartbeat.yml // spec/metricbeat.yml // spec/packetbeat.yml - unpacked := packer.MustUnpack("eJzMelmXqrq69v33M9btdxqasvbmjLEvxNoE0KKmOIuE3JFEAQ3onmKDZ5z/fkZCI6BWzWaNuc5FDa0Q0rx5m+d54n//sd8t6X9Gu+zf98tvx+W3/ygz/sd//UEyq8Bft/E8ML1Z4HGaY07j3ZrA+bMDrBNZqBeMXA0jZxoiV4kgTkL97rOcXrYxLJ3CXzh7Z+IWIRwlWAsKDEfKLAsOIXT3GM4NZrsqln0e9VWPGLwby4l6CqH3bQbxHsNAcdJT7KSqJT+z3vwHDCwlDIwLs10eQvVyO5/LJrmrEhBc3uJt7EyUONSM0zIwFKIa+wh5StU+jp2JuWMgKN5SMyMg4Gzctivkso0jODox5F8m6bhqB8YBad6RZHgfQU95S80D0YyTeD5bmEWIxs9tX9tMGIifHVDt6dreXVvdNlFimgUF0TFHWsGXX7fT67PqL9KC0VtqJqHmcap7qxCZO9l3/lPjlBiZR5r7O5LRbp/CsV1OoKHhwPiG0ea6n+YPyHHjEDJO8vlUvgPwbmk1Z6I8O3Zh1DbJInhWMHJXLLP2DHb3bV4wPPNQ9490fc/W1TzM5ifc7tHUQnhWMXrtrWu2MBMKlHYtxPY5XV/3TrVgj6GnEN29sfvNvNV4R4b8E0Pzvm2as8zZFsOnZwecOcmYEk3izVLjB2oHCtWVnfPyFL9OzIRk8zgC1mWhBaPpxP8b0QNF9FktTrGrBfsQeUoEvQuGVhlqcT6db//xx79VAbzM2W6b5sUgfH042lBg7Eg+j9+1YM2Qu2P2Zhpq6uYtNTnJ/BPR+IFN1AuGnkozriznu0QcNc6sNXvZxvg6RoFBoE1ymQ52ofb+7LyE+ttLPCXAyJEuXDipTAb8hOZsR9bb2EmN1wi6ZYjc0UxptvF67KztSHU/YeD9KMaZacEB2+YxEiG/2B5EmyPHPO9IHjy9peN0phknNjEsAqwLA3w9Uzrv6J4SIp/PtPMRl0Znj8q/Zploc6bOxNQjONoQnV3EePPLjiLLLInGyhAqsZ/xPUYeRf9szS6+t3Mg63xhwFJwcKZy79b57jyhlvBQK1YRHIn+e/Kync4WJl+CYI00vCPgvXZN8xQifyvW0rU3vZ5ZWvdLaMZat5wtxinLgjKCeOTUbQzwAkNDFWf3ehlPKTAuzBLjeUoIz/u3eFs4IHjC0Fthsc8m7JsUNHHv+8XCqfoBq8R6G1KFM3HbsZ3OumYLVaVA2NPng/YSI+/IkLvG6DXtjPNg3l7/wzLjp3t79dbj0yQ3SxwYKsn4QfgUAafnSarEGCU8VA2RXnhjOwosJXrZxk7W8R3k8VAPygj5rT3rEja9hvs4ZXB0s+fbtbSp+37Kq9tl+uHGIFV9lO5MmWod2x9R8F6fHU6IHfDrOpVu3E6lDTK+ZyAokT7oa3ucgGDNgFG+peaO5KbK7NdpN61iOErC7MxxXfKG6Xm2MBWaB1zuqRlP+px/bO3dTaupSbAYD80PIfTWGHkXpFmnqCoj+yZ1zrJiF2bWIQyU6b0yOcusUzSXMdUpp00s9cs0zYN9pxR8WLo+K5+zhVliqB5ZFqxk326ZqG0WaTwV8OUH5mthU/e87trvavfeHmeLwflNlLgpI911sIxfImgcJvFOj0BweEvNPYajnIF469pFNabdL00EBAes+1tRmhp/XC028Zd0fHKAdcATcxsib4bRRoxRl0TfmE3GOYbnhOr+LtQ9HiJ3HU3obpJ5RxFDNLOEn4g8vFnqrkoEzIDvB9nPVmLnqxK7mlWSr6Hinq4lb5XyJVlGNyVPpDTo8hDNmzInwz7MgoSNd1X6TE3SIsbc48wOTrOM78lixElmpQQEmy9QhInHe+iy6Zv7nCBzL8tMB1HizNpT7T2dTcbp7L36JNA6SPQDgwObjAqi+fwLigsKrHVUqpWpJx+h3g8R8p5oLI/gKJ9lZ86yYP8F+jzMg9zhygCZC5v4l5ksO0GKoaV8mqJSiXj+JdwOafyAQfDUuJ5EWl+3sSh/9CRDcEeynUhZK6r7JYZWgXRRUmUIHVuEdYuKRSrlNPdXIcSKcP8q1GV6E6jpiPVXGRIEWqdh+h6kxpLBcy8N1mg+IeC8YsBYEcAv7KWLdCv03qyZnrqh/BjBN30wStYYmYr0KenOQULQqzz7CM7lZ5s+5Tm7J5oZMuWJUBbnNFjrHeYh1sIVLM+lY9P89Wf3cbV5FmREd+vULUqwjKP6rHBJNOXZATVaPTWl7u/XNr3d87QutSKkRfqXe0CaWLd6eXRuw/VGyOfk6+0+enOeHqb8fvobsCdZpjLrQLWzgJu9FNysq88ghA3UhNpmjyFU7ecjriGc/N61t/QLLBjPsYZYMk767MgU8XpgE+PCgC+hMtX9TQSfBvMEmswDur+mYn3AOz0YR8X2+Nmxgw0d99dSwWr/GGqF2EeMgbGOtKAcjLMnGj3SLNhEyFtR7XxkAkILn5Jtr7f7L43LEnnivWfH9kbincYO31MiGfI40gyV2abaKWGfvtdn9df4ecxma8gEBM2w2vw0y0YJgcFF5GL8A6V9MH+lMiBPlOHpRyz2WsKdm5iqIYCyRCavfXrIXgsHWDot1TUB4qxZwoC3HTy7vF7tnyzzoMQLtfIPkKjhNQ6qMQA+MgET8003Bx0YsHYka/2jcEDtO9f381AfF9QOUqoHVwgPEoXZ5qoLPVvmbPsKBbsL0Z6ubZqVRdo/r1DlGiOFA9Qzs6/vkyxQcHY+suv+jq+XUA2R8M151wc6vqoMYkr8zy+9eWx/Ta++J2Ko7Stg/vL6TFBG4eu/F04DfogyCacFJjhIWJ3h41tq/m1VQd810c3WT3HuHkX+HNRFWRtwBxv9CMS84qftA8VDkf5LBV3J3qdMS7jIZ13I2Jn78CfAx70zYSWG/o6WdC/26GpFgrMicctT7Io8JGUAb+uWm2kLGflyWdwXOv2KjcTvWlAKRj3JvAK/CJPJdsHE6/aK/TrWXjDZ6kgmqoR1t5CrZimoFVC78OQRK6zY2yP3+iSN3GWHd1ysZYnD/f1Z84MWtn66Bglr57VNHrGkOnRaNlOvs1mLgKcCIlyF02sYVgJ0zZQmo1YcbsaaZTfiWoyuImIDLVqX/40CbktHBoLj4xLVOwNrHWrBhapGQgGXfX9unJqF/0Xi8dA3uzTgIfS7Zfw/tmc7uODAuC/sfue678OUXxrjIPMV8k4h9PjPneUQAsn/L/gn99iB6T8lqNf5ZyCcD3PKuK9k/JWqRfyPVoFIltG34o4EsQBBQnO/oth1HYl6bZ0aUssEVAsUhsaHCJ6LzySFpi8DQUGBpDKHFlK8qFkIz5dfv0hTE5JZOYaqoDHd8SU06fcVVIiJ+D4QSVdOBgZByiBN0eA2QcII+/XYs8fgAg5n/Ak1tPLrNl7qg8uaVi31VkzjSmQZJYaML+3xTb0YXAiV4nxQ7o1I7m8xEmf7epyl+++obX9JTezNe/cyLL/Jbzd5sLJdn2Z9L2z7q6Batiy+pfROcH2FgUIzvq71vPpGWuXMdnehVut+t7fOF4x8lU5GOwKUz4Kl6atgqJ4IsBT8mS44CBYCjQ3+qj7NkJmE2r6ok9xHuuB1fOSXDA40RGDkWOMHXI72Uld4UTcYuiouXSaSCQM8CytdRwYULY0CI7+MoFcHmHmkut+7hqo5liwYvWug3tWIesS2LKIHPJFcX8FQOSyh2l57CG4h7I3R/FkkGqL5Mphn2fwouJZIUrOcF2Qy2kTIa3Sw6bU43w/4rp4YwdEGo7jReiRHfkvNZo+XAU9qbvtX1HaPEgBpRhs8RButBMfF2XlXgXR+kABD8Kvcbzluq2/W/lZrTqXwHQLba7uMZkZxqzv5x2ub1/76oFqnmtCXwZXbHS3tHnhsuB7SrD2xHuiE1dzXOTvJ4XbvoyPRx12dhC+Bx6k9l8WpLe6ljItdrV+2vlpp4j09MkXzwVp1/4i0847q8/6VR6Pzdc6oV/B/aB/tGaYYYpnMfrMWeaP5IJ3tGEhWNAtyjJIWsN7ReaqilD59m2l1HtNfNx9eA/7q1eGwEN25PmxI2a/rrMHjIv2R7mq7IsaX0xdj/qUis/9/lu53tzaqC6mY42Ubuz3iK0nKIYQq7+t4ta49IMmNTiLytyDqHS0qibRgFSK3DIdXd7WPtHlC65C/1leaNXu8ezX3uV7Zee9H9NGBvvR7NdUhofh9uuxAT+7VkOoXSLKWZOKMRY0ROULeuXzXfUcfY4h3h77VaG+ki1EE5ritDW2d7MXy+lZD7MZkb9zv1OLu//qot5dD4++/8mukmVzPFeg5E/YthPhbuJDf90RjAntchiRqF9HN8h6LegfWOtICpQf0bAGoCs7AEOjRwq8m+gToiT43fT8EehKllqpVodXvAnq5QMKz93f5+QnQ6/d9CPTYI6AnGRxGD5nUb1ERaX1WjUJzvTQWthpVl8fZ3+8qnH8Sq/k/wV6kY//P//vfAAAA//8YxZul") + unpacked := packer.MustUnpack("eJzMeluXorq69v33M+bttw8cylqLPca6EGsRQIuaYhcJuSOJAhrQ1YKKe+z/vkfCQUCrq7vnHD33RQ2tGHJ48x6e5wn//dvxsKb/GR2yfz+uv57WX/+jyvhv//UbyawCf9nHy8D0FoHHaY45jQ9bApfPDrDOZKVeMXI1jJx5iFwlgjgJ9Ye/5fS6j2HlFP7KOToztwjhJMFaUGA4URZZUIbQPWK4NJjtqlj2+aivesLg3VjP1HMIva8LiI8YBoqTnmMnVS35mQ3mLzGwlDAwrsx2eQjV6/18LpvlrkpAcH2L97EzU+JQM87rwFCIahwj5Cl1+zR2ZuaBgaB4S82MgICzadeukOs+juDkzJB/naXTuh0YJdK8E8nwMYKe8paaJdGMs/h9sTKLEE2fu762mTAQPzug3tOtvb+2pm2mxDQLCqJjjrSCr7/s57ff6r9ICyZvqZmEmsep7m1CZB5k3+VPjVNhZJ5o7h9IRvt9Csd2OYGGhgPjK0a7237aPyDHjUPIOMmXc/kMwIe11Z6J8uzYhdHYJIvgRcHI3bDMOjLY37d5xfDCQ90/0e0jW9fzMJufcbdHUwvhRcXodbCuxcpMKFC6tRDb53R72zvVgiOGnkJ0987ud/PW450Y8s8MLYe2ac8yZ3sMn54dcOEkY0o0i3drjZfUDhSqKwfn5Sl+nZkJyZZxBKzrSgsm85n/N6IHiuizWZ1jVwuOIfKUCHpXDK0q1OJ8vtz/47d/qwN4nbPDPs2LUfj6cLKjwDiQfBm/a8GWIffA7N081NTdW2pykvlnovGSzdQrhp5KM66sl4dEHDXOrC172cf4NkaBQaDNcpkODqH2/uy8hPrbSzwnwMiRLlw4qU0G/ITm7EC2+9hJjdcIulWI3MlCabfxeuqt7UR1P2Hg/STGWWhBiW3zFImQX+1L0ebIMS8HkgdPb+k0XWjGmc0MiwDrygDfLpTeM7qnhMjnC+1ywpXR26Pyr0Um2py5MzP1CE52RGdXMd7yeqDIMiuisSqESuxn/IiRR9E/O7OL790cyLpcGbAUHFyo3Lt1eThPqCU81IpNBCei/5G87OeLlcnXINgiDR8IeG9c0zyHyN+LtfTtTW9nljb9Epqxzi0Xq2nKsqCKIJ44TRsDvMDQUMXZvV6ncwqMK7PEeJ4SwsvxLd4XDgieMPQ2WOyzDfs2Bc3cx36xcup+wKqw3oVU4czcbmynt67FSlUpEPb0+ai9wsg7MeRuMXpNe+N8MO+gf7nO+PnRXr3t9DzLzQoHhkoyXgqfIuD8PEuVGKOEh6oh0gtvbUeBpUQv+9jJer6DPB7qQRUhv7NnU8Lmt3CfpgxO7vZ8v5YudT9OeU27TD/cGKWqb6U7U6Zax/YnFLw3Z4cTYgf8tk6lH7dzaYOMHxkIKqSP+toeJyDYMmBUb6l5ILmpMvt13k+rGE6SMLtw3JS8cXperEyF5gGXe2rHkz7nnzp799NqahIsxkPLMoTeFiPvijTrHNVl5NimzkVWHMLMKsNAmT8qk4vMOkdLGVO9ctrG0rBM0zw49krBN0vXZ+VzsTIrDNUTy4KN7NsvE43NIo2nAr78wHwdbOqf10P73ew+2ONiNTq/mRK3ZaS/DpbxawSNchYf9AgE5VtqHjGc5AzEe9cu6jHtYWkiICix7u9FaWr9cbPaxb+n07MDrBLPzH2IvAVGOzFGUxJ9YzGb5hheEqr7h1D3eIjcbTSjh1nmnUQM0cwSfiLy8G6tuyoRMAO+l7KfrcTOFyV2NasiX0LFPd9K3ibla7KO7kqeSGnQ5SFatmVOhn2YBQmbHur0mZqkQ4y5x5kdnBcZP5LVhJPMSgkIdr9DESYeH6DLtm/uc4LMoywzPUSJM+tItfd0MZumi/f6k0CrlOgHBiWbTQqi+fx3FBcUWNuoUmtTz76Fer+JkI9EY3kEJ/kiu3CWBcffoc/DPMgdroyQubCJf13IshOkGFrKpykqlYjnX8LtkMZLDIKn1vUk0vqyj0X5o2cZggeSHUTK2lDdrzC0CqSLkipD6NQhrHtULFIpp7m/CSFWhPvXoS7Tm0BNJ6y/ypAg0DqP0/coNVYMXgZpsEHzCQGXDQPGhgB+ZS99pFuj93bN9NwP5Y8RfNsHo2SLkalIn5LuHCQEvcqzj+BSfnbpU56ze6aZIVOeCGVxTqO1PmAeYi1cwfJcejbNX392HzebZ0FGdLdJ3aIEyzhqzgpXRFOeHdCg1XNb6v5+a9O7Pc+bUitCWqR/uQekiXWr14/ObbzeCPmcfLnfx2DO84cpf5j+RuxJlqnMKql2EXBzkILbdQ0ZhLCBmlDbHDCEuv1ywg2Ek9/79pZ+gQXjOTUQS8bJkB2ZIl5LNjOuDPgSKlPd30XwaTRPoMk8oPtbKtYHvPMH46jYnj47drCj0+Faaljtn0KtEPuIMTC2kRZUo3GORKMnmgW7CHkbql1OTEBo4VOy7fV+/5VxXSNPPPfs2N5EPNPa4XtKJEMeR5qhMttUeyXs0+eGrP4WPx+z2QYyAUEzrC4/LbJJQmBwFbkY/0BpH81fqwzIE2V4/i0Weyvhzl1MNRBAWSOTNz49Zq+FAyydVuqWAHHWLGHA249+u77e7J+s86DCK7X2D5Co4S0O6jEAPjEBE/NdPweVDFgHknX+UTig8Z3b83moTwtqBynVgxuEB4nCbHPTh54dc7Z9hYLDlWhPtzbNyiLtnzeocouRwgHqhdm350kWKDi7nNhtf6fXa6iGSPjmsu8DPV9VRjEl/ufXwTy2v6U33xMx1PUVMH99+01QRuHrvxZOA15GmYTTAhOUElZn+PSWmn/b1NB3S3Sz81OcuyeRP0d1UdYG3MNGPwIxb/hp/4HioUj/pYKuZO9zpiVc5LM+ZOzNXf4J8PHozFiFoX+gFT2KPbpakeCsSNzqHLsiD0kZwNu71W7eQUa+XhePhU6/ZiPxuxZUglHPMq/AL8Jksl0w8aa9Zr+OdRRMtj6SmSph3T3kalgK6gTUPjz5iBXW7O0j9/okjTxkhw9crGOJ4/39WfODDrZ+ugYJa5eNTT5iSU3odGymWWe7FgFPBUS4Cae3MKwF6IYpzSadONyOtcjuxLUY3UTEFlp0Lt+IlS1D30RwIsLrsYAoYGHrB3njB+nkTLTLIdR3ZQSXj+Zq00b5Ouv6zn+hcNzRoJHQ+XFpHJy9tQ214EpVI6GAy74/N07D/v8i0XocE3368SHkvFcafmzPdnDFgfFYUP7OdT+GR39ojFLmSeSdQ+jxnzvLMfSS/1/xT+6xRw9+Sshv8t5IsB/nsulQQfkr1ZL4H53ykayjr8UD6WMFgoTmfk3tm/oVDdp6tauRJ6gWKAxNywheis+kjLYvA0FBgaRQZQdlXtQshJfrH7/AUxOSWTmGqqBP/fElJBr2FRSMifguiaRJZwODIGWQpmh0iyHhi/16GthjdPGHM/6EWjr7ZR+v9dElUafSehumcSWyjApDxtf29K5OjS6iKnE+KPcmJPf3GImzfT0t0uN31NS/pBYP5n14CZff5be7PFjbbkjvvhcu/lUQMVsXX1P6ILi+wEChGd82OmJzE65yZruHUGv0xvvb7itGvkpnkwMBymfB0vZVMFTPBFgK/kyPHAULgcYOf1GfFshMQu1YNEnuW3rkbXzkVwyOtEtg5FjjJa4mR6lnvKg7DF0VVy4TyYQBnoW1niQDilZGgZFfRdBrAsw80RogdddfDbeTBWNw/TS4klFP2JZFtMQzqTEoGCrlGqrddYvgNMLeGC2fRaIhmi+DeZEtT4LjiSS1yHlBZpNdhLxWf5vfivPjgO/rmBGc7DCKW41JcvO31Gz3eB3xs/Ytgw213ZMEQJrRBQ/RJhvBrXEmwNyyDlYBMASvy/2OW3e6auNvjdZVCd8hsLsuzGhmFPd6l3+6tXndWw/1OtWEvoyu+h5oeI/AY8sxkWYdifWBPlnPfZuzlxzu9z45EX3a12f4Gnic2ktZnLriXsm4ODS6aeertRY/0EFTtBytVfdPSLscqL4cXrW0+mLvjAYF/4f20Z1hiiGWyewXa6B3WhPS2YGBZEOzIMco6QDrA32pLkrp09eF1uQx/XX3zevHP3plOS5ED64tWzL4x/Xd4OMi/S2913ZFjK/nL8by95pE//9Fejzc26gppGKOl33sDgi3JCllCFU+1A8bPX1Ezlt9RuRvBi+8p4ElkRZsQuRW4fjKsPGRLk9oPfLX+Uq7Zo/3rwQ/10l7z/2ILjvStX6tljsmFL9ODx7p2IMaUr/5JGtJJs5Y1BiRI+Rdz3fdswwxhnh27Fut5kf6GEVgjvva0NXJQSxv77XLfkwOxv1ODfDxW0+DvZStv/+Rt6AWcj03oOfM2NcQ4q/hSn4/Eo0J7HEdk6hDRHfrRyzqHVjbSAuUAdCzBaAqOANjoEcLv57oE6An+tz1/SbQkyi1Uq0arX4X0MsFEl68v8vPT4DesO+HQI99BPQkg8PoQyb1S9RL2pxVq9DcLquFrSb1pXX294fK6p/Eav5PsBfp2P/z//43AAD//+9xxHg=") SupportedMap = make(map[string]Spec) for f, v := range unpacked { diff --git a/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go b/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go index 81f03c3b009..2694ed1cd3f 100644 --- a/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go +++ b/x-pack/elastic-agent/pkg/basecmd/version/cmd_test.go @@ -54,7 +54,7 @@ func TestCmdBinaryOnlyYAML(t *testing.T) { } func TestCmdDaemon(t *testing.T) { - srv := server.New(newErrorLogger(t), nil, nil) + srv := server.New(newErrorLogger(t), nil, nil, nil) require.NoError(t, srv.Start()) defer srv.Stop() @@ -70,7 +70,7 @@ func TestCmdDaemon(t *testing.T) { } func TestCmdDaemonYAML(t *testing.T) { - srv := server.New(newErrorLogger(t), nil, nil) + srv := server.New(newErrorLogger(t), nil, nil, nil) require.NoError(t, srv.Start()) defer srv.Stop() diff --git a/x-pack/elastic-agent/pkg/capabilities/capabilities.go b/x-pack/elastic-agent/pkg/capabilities/capabilities.go index 498e09ac101..b03bef73d8c 100644 --- a/x-pack/elastic-agent/pkg/capabilities/capabilities.go +++ b/x-pack/elastic-agent/pkg/capabilities/capabilities.go @@ -8,6 +8,8 @@ import ( "errors" "os" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" + "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" @@ -44,7 +46,7 @@ func Load(capsFile string, log *logger.Logger, sc status.Controller) (Capability cm := &capabilitiesManager{ caps: make([]Capability, 0), - reporter: sc.RegisterWithPersistance("capabilities", true), + reporter: sc.RegisterComponentWithPersistance("capabilities", true), } // load capabilities from file @@ -85,7 +87,7 @@ func Load(capsFile string, log *logger.Logger, sc status.Controller) (Capability func (mgr *capabilitiesManager) Apply(in interface{}) (interface{}, error) { var err error // reset health on start, child caps will update to fail if needed - mgr.reporter.Update(status.Healthy) + mgr.reporter.Update(state.Healthy, "") for _, cap := range mgr.caps { in, err = cap.Apply(in) if err != nil { diff --git a/x-pack/elastic-agent/pkg/capabilities/capabilities_test.go b/x-pack/elastic-agent/pkg/capabilities/capabilities_test.go index d59b7c62939..46107463151 100644 --- a/x-pack/elastic-agent/pkg/capabilities/capabilities_test.go +++ b/x-pack/elastic-agent/pkg/capabilities/capabilities_test.go @@ -135,7 +135,7 @@ func TestCapabilityManager(t *testing.T) { caps: []Capability{ filterKeywordCap{keyWord: "filter"}, }, - reporter: status.NewController(l).Register("test"), + reporter: status.NewController(l).RegisterComponent("test"), } newIn, err := mgr.Apply(m) @@ -160,7 +160,7 @@ func TestCapabilityManager(t *testing.T) { filterKeywordCap{keyWord: "filter"}, blockCap{}, }, - reporter: status.NewController(l).Register("test"), + reporter: status.NewController(l).RegisterComponent("test"), } newIn, err := mgr.Apply(m) @@ -185,7 +185,7 @@ func TestCapabilityManager(t *testing.T) { filterKeywordCap{keyWord: "filter"}, blockCap{}, }, - reporter: status.NewController(l).Register("test"), + reporter: status.NewController(l).RegisterComponent("test"), } newIn, err := mgr.Apply(m) @@ -210,7 +210,7 @@ func TestCapabilityManager(t *testing.T) { filterKeywordCap{keyWord: "filter"}, keepAsIsCap{}, }, - reporter: status.NewController(l).Register("test"), + reporter: status.NewController(l).RegisterComponent("test"), } newIn, err := mgr.Apply(m) @@ -235,7 +235,7 @@ func TestCapabilityManager(t *testing.T) { filterKeywordCap{keyWord: "filter"}, keepAsIsCap{}, }, - reporter: status.NewController(l).Register("test"), + reporter: status.NewController(l).RegisterComponent("test"), } newIn, err := mgr.Apply(m) @@ -260,7 +260,7 @@ func TestCapabilityManager(t *testing.T) { filterKeywordCap{keyWord: "filter"}, filterKeywordCap{keyWord: "key"}, }, - reporter: status.NewController(l).Register("test"), + reporter: status.NewController(l).RegisterComponent("test"), } newIn, err := mgr.Apply(m) @@ -283,7 +283,7 @@ func TestCapabilityManager(t *testing.T) { filterKeywordCap{keyWord: "key"}, filterKeywordCap{keyWord: "filter"}, }, - reporter: status.NewController(l).Register("test"), + reporter: status.NewController(l).RegisterComponent("test"), } newIn, err := mgr.Apply(m) diff --git a/x-pack/elastic-agent/pkg/capabilities/input.go b/x-pack/elastic-agent/pkg/capabilities/input.go index 7ddc1a22496..6515bd5b715 100644 --- a/x-pack/elastic-agent/pkg/capabilities/input.go +++ b/x-pack/elastic-agent/pkg/capabilities/input.go @@ -7,6 +7,8 @@ package capabilities import ( "fmt" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" @@ -150,8 +152,9 @@ func (c *inputCapability) renderInputs(inputs []map[string]interface{}) ([]map[s input[conditionKey] = isSupported if !isSupported { - c.log.Errorf("input '%s' is left out due to capability restriction '%s'", inputType, c.name()) - c.reporter.Update(status.Degraded) + msg := fmt.Sprintf("input '%s' is left out due to capability restriction '%s'", inputType, c.name()) + c.log.Errorf(msg) + c.reporter.Update(state.Degraded, msg) } newInputs = append(newInputs, input) diff --git a/x-pack/elastic-agent/pkg/capabilities/input_test.go b/x-pack/elastic-agent/pkg/capabilities/input_test.go index 8416a81649b..7a2707d8f83 100644 --- a/x-pack/elastic-agent/pkg/capabilities/input_test.go +++ b/x-pack/elastic-agent/pkg/capabilities/input_test.go @@ -8,11 +8,12 @@ import ( "fmt" "testing" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" ) @@ -394,5 +395,5 @@ func getInputsMap(tt ...string) map[string]interface{} { type testReporter struct{} -func (*testReporter) Update(status.AgentStatus) {} -func (*testReporter) Unregister() {} +func (*testReporter) Update(state.Status, string) {} +func (*testReporter) Unregister() {} diff --git a/x-pack/elastic-agent/pkg/capabilities/output.go b/x-pack/elastic-agent/pkg/capabilities/output.go index 34a9ca6e055..bf47123f337 100644 --- a/x-pack/elastic-agent/pkg/capabilities/output.go +++ b/x-pack/elastic-agent/pkg/capabilities/output.go @@ -7,6 +7,8 @@ package capabilities import ( "fmt" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" @@ -129,8 +131,9 @@ func (c *outputCapability) renderOutputs(outputs map[string]interface{}) (map[st outputs[outputName] = output if !isSupported { - c.log.Errorf("output '%s' is left out due to capability restriction '%s'", outputName, c.name()) - c.reporter.Update(status.Degraded) + msg := fmt.Sprintf("output '%s' is left out due to capability restriction '%s'", outputName, c.name()) + c.log.Errorf(msg) + c.reporter.Update(state.Degraded, msg) } } diff --git a/x-pack/elastic-agent/pkg/capabilities/upgrade.go b/x-pack/elastic-agent/pkg/capabilities/upgrade.go index 4ca6f9074d4..8712529c841 100644 --- a/x-pack/elastic-agent/pkg/capabilities/upgrade.go +++ b/x-pack/elastic-agent/pkg/capabilities/upgrade.go @@ -8,6 +8,8 @@ import ( "fmt" "strings" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" @@ -125,8 +127,9 @@ func (c *upgradeCapability) Apply(upgradeMap map[string]interface{}) (map[string // if deny switch the logic if c.Type == denyKey { isSupported = !isSupported - c.log.Errorf("upgrade is blocked out due to capability restriction '%s'", c.name()) - c.reporter.Update(status.Degraded) + msg := fmt.Sprintf("upgrade is blocked out due to capability restriction '%s'", c.name()) + c.log.Errorf(msg) + c.reporter.Update(state.Degraded, msg) } if !isSupported { diff --git a/x-pack/elastic-agent/pkg/core/logger/logger.go b/x-pack/elastic-agent/pkg/core/logger/logger.go index 9ae4c78834a..3e70cd88e57 100644 --- a/x-pack/elastic-agent/pkg/core/logger/logger.go +++ b/x-pack/elastic-agent/pkg/core/logger/logger.go @@ -43,7 +43,7 @@ func NewWithLogpLevel(name string, level logp.Level) (*Logger, error) { return new(name, defaultCfg) } -//NewFromConfig takes the user configuration and generate the right logger. +// NewFromConfig takes the user configuration and generate the right logger. // TODO: Finish implementation, need support on the library that we use. func NewFromConfig(name string, cfg *Config) (*Logger, error) { return new(name, cfg) diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index 1de2feb559f..4586505db8d 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -85,21 +85,24 @@ func NewApplication( b, _ := tokenbucket.NewTokenBucket(ctx, 3, 3, 1*time.Second) return &Application{ - bgContext: ctx, - id: id, - name: appName, - pipelineID: pipelineID, - logLevel: logLevel, - desc: desc, - srv: srv, - processConfig: cfg.ProcessConfig, - logger: logger, - limiter: b, + bgContext: ctx, + id: id, + name: appName, + pipelineID: pipelineID, + logLevel: logLevel, + desc: desc, + srv: srv, + processConfig: cfg.ProcessConfig, + logger: logger, + limiter: b, + state: state.State{ + Status: state.Stopped, + }, reporter: reporter, monitor: monitor, uid: uid, gid: gid, - statusReporter: statusController.Register(id), + statusReporter: statusController.RegisterApp(id, appName), }, nil } @@ -231,25 +234,6 @@ func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState { return resChan } -func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg string, payload map[string]interface{}) { - var status state.Status - switch pstatus { - case proto.StateObserved_STARTING: - status = state.Starting - case proto.StateObserved_CONFIGURING: - status = state.Configuring - case proto.StateObserved_HEALTHY: - status = state.Running - case proto.StateObserved_DEGRADED: - status = state.Degraded - case proto.StateObserved_FAILED: - status = state.Failed - case proto.StateObserved_STOPPING: - status = state.Stopping - } - a.setState(status, msg, payload) -} - func (a *Application) setState(s state.Status, msg string, payload map[string]interface{}) { if a.state.Status != s || a.state.Message != msg || !reflect.DeepEqual(a.state.Payload, payload) { a.state.Status = s @@ -258,15 +242,7 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in if a.reporter != nil { go a.reporter.OnStateChange(a.id, a.name, a.state) } - - switch s { - case state.Configuring, state.Restarting, state.Starting, state.Stopping, state.Updating: - // no action - case state.Crashed, state.Failed, state.Degraded: - a.statusReporter.Update(status.Degraded) - default: - a.statusReporter.Update(status.Healthy) - } + a.statusReporter.Update(s, msg) } } diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/configure.go b/x-pack/elastic-agent/pkg/core/plugin/process/configure.go index fca2ea0f9c1..23ebdafbf60 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/configure.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/configure.go @@ -11,7 +11,6 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/status" ) // Configure configures the application with the passed configuration. @@ -20,9 +19,7 @@ func (a *Application) Configure(_ context.Context, config map[string]interface{} if err != nil { // inject App metadata err = errors.New(err, errors.M(errors.MetaKeyAppName, a.name), errors.M(errors.MetaKeyAppName, a.id)) - a.statusReporter.Update(status.Degraded) - } else { - a.statusReporter.Update(status.Healthy) + a.statusReporter.Update(state.Degraded, err.Error()) } }() diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/status.go b/x-pack/elastic-agent/pkg/core/plugin/process/status.go index 473ae9a70c7..21ded667101 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/status.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/status.go @@ -28,7 +28,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St return } - a.setStateFromProto(status, msg, payload) + a.setState(state.FromProto(status), msg, payload) if status == proto.StateObserved_FAILED { // ignore when expected state is stopping if s.Expected() == proto.StateExpected_STOPPING { diff --git a/x-pack/elastic-agent/pkg/core/plugin/service/app.go b/x-pack/elastic-agent/pkg/core/plugin/service/app.go index 8e336338f8f..97196e0307f 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/service/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/service/app.go @@ -90,22 +90,25 @@ func NewApplication( b, _ := tokenbucket.NewTokenBucket(ctx, 3, 3, 1*time.Second) return &Application{ - bgContext: ctx, - id: id, - name: appName, - pipelineID: pipelineID, - logLevel: logLevel, - desc: desc, - srv: srv, - processConfig: cfg.ProcessConfig, - logger: logger, - limiter: b, + bgContext: ctx, + id: id, + name: appName, + pipelineID: pipelineID, + logLevel: logLevel, + desc: desc, + srv: srv, + processConfig: cfg.ProcessConfig, + logger: logger, + limiter: b, + state: state.State{ + Status: state.Stopped, + }, reporter: reporter, monitor: monitor, uid: uid, gid: gid, credsPort: credsPort, - statusReporter: statusController.Register(id), + statusReporter: statusController.RegisterApp(id, appName), }, nil } @@ -207,9 +210,7 @@ func (a *Application) Configure(_ context.Context, config map[string]interface{} if err != nil { // inject App metadata err = errors.New(err, errors.M(errors.MetaKeyAppName, a.name), errors.M(errors.MetaKeyAppName, a.id)) - a.statusReporter.Update(status.Degraded) - } else { - a.statusReporter.Update(status.Healthy) + a.statusReporter.Update(state.Degraded, err.Error()) } }() @@ -287,26 +288,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St return } - a.setStateFromProto(status, msg, payload) -} - -func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg string, payload map[string]interface{}) { - var status state.Status - switch pstatus { - case proto.StateObserved_STARTING: - status = state.Starting - case proto.StateObserved_CONFIGURING: - status = state.Configuring - case proto.StateObserved_HEALTHY: - status = state.Running - case proto.StateObserved_DEGRADED: - status = state.Degraded - case proto.StateObserved_FAILED: - status = state.Failed - case proto.StateObserved_STOPPING: - status = state.Stopping - } - a.setState(status, msg, payload) + a.setState(state.FromProto(status), msg, payload) } func (a *Application) setState(s state.Status, msg string, payload map[string]interface{}) { @@ -317,15 +299,7 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in if a.reporter != nil { go a.reporter.OnStateChange(a.id, a.name, a.state) } - - switch s { - case state.Configuring, state.Restarting, state.Starting, state.Stopping, state.Updating: - // no action - case state.Crashed, state.Failed, state.Degraded: - a.statusReporter.Update(status.Degraded) - default: - a.statusReporter.Update(status.Healthy) - } + a.statusReporter.Update(s, msg) } } diff --git a/x-pack/elastic-agent/pkg/core/state/state.go b/x-pack/elastic-agent/pkg/core/state/state.go index 670cdc2a2f2..98319ad3315 100644 --- a/x-pack/elastic-agent/pkg/core/state/state.go +++ b/x-pack/elastic-agent/pkg/core/state/state.go @@ -5,6 +5,8 @@ package state import ( + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process" ) @@ -13,27 +15,57 @@ type Status int const ( // Stopped is status describing not running application. - Stopped Status = iota + Stopped Status = -4 + // Crashed is status describing application is crashed. + Crashed Status = -3 + // Restarting is status describing application is restarting. + Restarting Status = -2 + // Updating is status describing application is updating. + Updating Status = -1 + // Starting is status describing application is starting. - Starting + Starting = Status(proto.StateObserved_STARTING) // Configuring is status describing application is configuring. - Configuring - // Running is status describing application is running. - Running + Configuring = Status(proto.StateObserved_CONFIGURING) + // Healthy is status describing application is running. + Healthy = Status(proto.StateObserved_HEALTHY) // Degraded is status describing application is degraded. - Degraded + Degraded = Status(proto.StateObserved_DEGRADED) // Failed is status describing application is failed. - Failed + Failed = Status(proto.StateObserved_FAILED) // Stopping is status describing application is stopping. - Stopping - // Crashed is status describing application is crashed. - Crashed - // Restarting is status describing application is restarting. - Restarting - // Updating is status describing application is updating. - Updating + Stopping = Status(proto.StateObserved_STOPPING) ) +// IsInternal returns true if the status is an internal status and not something that should be reported +// over the protocol as an actual status. +func (s Status) IsInternal() bool { + return s < Starting +} + +// ToProto converts the status to status that is compatible with the protocol. +func (s Status) ToProto() proto.StateObserved_Status { + if !s.IsInternal() { + return proto.StateObserved_Status(s) + } + if s == Updating || s == Restarting { + return proto.StateObserved_STARTING + } + if s == Crashed { + return proto.StateObserved_FAILED + } + if s == Stopped { + return proto.StateObserved_STOPPING + } + // fallback to degraded + return proto.StateObserved_DEGRADED +} + +// FromProto converts the status from protocol to status Agent representation. +func FromProto(s proto.StateObserved_Status) Status { + return Status(s) +} + // State wraps the process state and application status. type State struct { ProcessInfo *process.Info diff --git a/x-pack/elastic-agent/pkg/core/status/reporter.go b/x-pack/elastic-agent/pkg/core/status/reporter.go index 2d3517a434f..d4abd96a990 100644 --- a/x-pack/elastic-agent/pkg/core/status/reporter.go +++ b/x-pack/elastic-agent/pkg/core/status/reporter.go @@ -9,55 +9,71 @@ import ( "github.com/google/uuid" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" ) -// AgentStatus represents a status of agent. -type AgentStatus int - -// UpdateFunc is used by components to notify reporter about status changes. -type UpdateFunc func(AgentStatus) +// AgentStatusCode is the status code for the Elastic Agent overall. +type AgentStatusCode int const ( // Healthy status means everything is fine. - Healthy AgentStatus = iota + Healthy AgentStatusCode = iota // Degraded status means something minor is preventing agent to work properly. Degraded // Failed status means agent is unable to work properly. Failed ) -var ( - humanReadableStatuses = map[AgentStatus]string{ - Healthy: "online", - Degraded: "degraded", - Failed: "error", - } -) +// String returns the string value for the agent code. +func (s AgentStatusCode) String() string { + return []string{"online", "degraded", "error"}[s] +} + +// AgentApplicationStatus returns the status of specific application. +type AgentApplicationStatus struct { + ID string + Name string + Status state.Status + Message string +} + +// AgentStatus returns the overall status of the Elastic Agent. +type AgentStatus struct { + Status AgentStatusCode + Message string + Applications []AgentApplicationStatus +} // Controller takes track of component statuses. type Controller interface { - Register(string) Reporter - RegisterWithPersistance(string, bool) Reporter + RegisterComponent(string) Reporter + RegisterComponentWithPersistance(string, bool) Reporter + RegisterApp(id string, name string) Reporter Status() AgentStatus + StatusCode() AgentStatusCode StatusString() string UpdateStateID(string) } type controller struct { - lock sync.Mutex - status AgentStatus - reporters map[string]*reporter - log *logger.Logger - stateID string + mx sync.Mutex + status AgentStatusCode + reporters map[string]*reporter + appReporters map[string]*reporter + log *logger.Logger + stateID string } // NewController creates a new reporter. func NewController(log *logger.Logger) Controller { return &controller{ - status: Healthy, - reporters: make(map[string]*reporter), - log: log, + status: Healthy, + reporters: make(map[string]*reporter), + appReporters: make(map[string]*reporter), + log: log, } } @@ -68,82 +84,145 @@ func (r *controller) UpdateStateID(stateID string) { return } - r.lock.Lock() + r.mx.Lock() r.stateID = stateID - // cleanup status + // cleanup status for component reporters + // the status of app reports remain the same for _, rep := range r.reporters { if !rep.isRegistered { continue } - rep.lock.Lock() + rep.mx.Lock() if !rep.isPersistent { - rep.status = Healthy + rep.status = state.Configuring + rep.message = "" } - rep.lock.Unlock() + rep.mx.Unlock() } - r.lock.Unlock() + r.mx.Unlock() r.updateStatus() } // Register registers new component for status updates. -func (r *controller) Register(componentIdentifier string) Reporter { - return r.RegisterWithPersistance(componentIdentifier, false) +func (r *controller) RegisterComponent(componentIdentifier string) Reporter { + return r.RegisterComponentWithPersistance(componentIdentifier, false) } -func (r *controller) RegisterWithPersistance(componentIdentifier string, persistent bool) Reporter { +// Register registers new component for status updates. +func (r *controller) RegisterComponentWithPersistance(componentIdentifier string, persistent bool) Reporter { id := componentIdentifier + "-" + uuid.New().String()[:8] rep := &reporter{ + name: componentIdentifier, isRegistered: true, unregisterFunc: func() { - r.lock.Lock() + r.mx.Lock() delete(r.reporters, id) - r.lock.Unlock() + r.mx.Unlock() }, notifyChangeFunc: r.updateStatus, isPersistent: persistent, } - r.lock.Lock() + r.mx.Lock() r.reporters[id] = rep - r.lock.Unlock() + r.mx.Unlock() + + return rep +} + +// RegisterApp registers new component for status updates. +func (r *controller) RegisterApp(componentIdentifier string, name string) Reporter { + id := componentIdentifier + "-" + uuid.New().String()[:8] + rep := &reporter{ + name: name, + status: state.Stopped, + isRegistered: true, + unregisterFunc: func() { + r.mx.Lock() + delete(r.appReporters, id) + r.mx.Unlock() + }, + notifyChangeFunc: r.updateStatus, + } + + r.mx.Lock() + r.appReporters[id] = rep + r.mx.Unlock() return rep } // Status retrieves current agent status. func (r *controller) Status() AgentStatus { + r.mx.Lock() + defer r.mx.Unlock() + apps := make([]AgentApplicationStatus, 0, len(r.appReporters)) + for key, rep := range r.appReporters { + rep.mx.Lock() + apps = append(apps, AgentApplicationStatus{ + ID: key, + Name: rep.name, + Status: rep.status, + Message: rep.message, + }) + rep.mx.Unlock() + } + return AgentStatus{ + Status: r.status, + Message: "", + Applications: apps, + } +} + +// StatusCode retrieves current agent status code. +func (r *controller) StatusCode() AgentStatusCode { + r.mx.Lock() + defer r.mx.Unlock() return r.status } func (r *controller) updateStatus() { status := Healthy - r.lock.Lock() + r.mx.Lock() for id, rep := range r.reporters { - s := rep.status + s := statusToAgentStatus(rep.status) if s > status { status = s } - r.log.Debugf("'%s' has status '%s'", id, humanReadableStatuses[s]) + r.log.Debugf("'%s' has status '%s'", id, s) if status == Failed { break } } + if status != Failed { + for id, rep := range r.appReporters { + s := statusToAgentStatus(rep.status) + if s > status { + status = s + } + + r.log.Debugf("'%s' has status '%s'", id, s) + if status == Failed { + break + } + } + } if r.status != status { r.logStatus(status) r.status = status } - r.lock.Unlock() + r.mx.Unlock() } -func (r *controller) logStatus(status AgentStatus) { +func (r *controller) logStatus(status AgentStatusCode) { logFn := r.log.Infof if status == Degraded { logFn = r.log.Warnf @@ -151,37 +230,40 @@ func (r *controller) logStatus(status AgentStatus) { logFn = r.log.Errorf } - logFn("Elastic Agent status changed to: '%s'", humanReadableStatuses[status]) + logFn("Elastic Agent status changed to: '%s'", status) } // StatusString retrieves human readable string of current agent status. func (r *controller) StatusString() string { - return humanReadableStatuses[r.Status()] + return r.StatusCode().String() } // Reporter reports status of component type Reporter interface { - Update(AgentStatus) + Update(state.Status, string) Unregister() } type reporter struct { - lock sync.Mutex + name string + mx sync.Mutex isPersistent bool isRegistered bool - status AgentStatus + status state.Status + message string unregisterFunc func() notifyChangeFunc func() } // Update updates the status of a component. -func (r *reporter) Update(s AgentStatus) { - r.lock.Lock() - defer r.lock.Unlock() +func (r *reporter) Update(s state.Status, message string) { + r.mx.Lock() + defer r.mx.Unlock() if !r.isRegistered { return } + r.message = message if r.status != s { r.status = s r.notifyChangeFunc() @@ -191,10 +273,21 @@ func (r *reporter) Update(s AgentStatus) { // Unregister unregister status from reporter. Reporter will no longer be taken into consideration // for overall status computation. func (r *reporter) Unregister() { - r.lock.Lock() - defer r.lock.Unlock() + r.mx.Lock() + defer r.mx.Unlock() r.isRegistered = false r.unregisterFunc() r.notifyChangeFunc() } + +func statusToAgentStatus(status state.Status) AgentStatusCode { + s := status.ToProto() + if s == proto.StateObserved_DEGRADED { + return Degraded + } + if s == proto.StateObserved_FAILED { + return Failed + } + return Healthy +} diff --git a/x-pack/elastic-agent/pkg/core/status/reporter_test.go b/x-pack/elastic-agent/pkg/core/status/reporter_test.go index 5706f42c22a..55fcd3e04fe 100644 --- a/x-pack/elastic-agent/pkg/core/status/reporter_test.go +++ b/x-pack/elastic-agent/pkg/core/status/reporter_test.go @@ -10,85 +10,92 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state" ) func TestReporter(t *testing.T) { l, _ := logger.New("") t.Run("healthy by default", func(t *testing.T) { r := NewController(l) - assert.Equal(t, Healthy, r.Status()) + assert.Equal(t, Healthy, r.StatusCode()) assert.Equal(t, "online", r.StatusString()) }) t.Run("healthy when all healthy", func(t *testing.T) { r := NewController(l) - r1 := r.Register("r1") - r2 := r.Register("r2") - r3 := r.Register("r3") - - r1.Update(Healthy) - r2.Update(Healthy) - r3.Update(Healthy) - - assert.Equal(t, Healthy, r.Status()) + r1 := r.RegisterComponent("r1") + r2 := r.RegisterComponent("r2") + r3 := r.RegisterComponent("r3") + a1 := r.RegisterApp("app-1", "app") + a2 := r.RegisterApp("app-2", "app") + a3 := r.RegisterApp("other-1", "other") + + r1.Update(state.Healthy, "") + r2.Update(state.Healthy, "") + r3.Update(state.Healthy, "") + a1.Update(state.Healthy, "") + a2.Update(state.Healthy, "") + a3.Update(state.Healthy, "") + + assert.Equal(t, Healthy, r.StatusCode()) assert.Equal(t, "online", r.StatusString()) }) t.Run("degraded when one degraded", func(t *testing.T) { r := NewController(l) - r1 := r.Register("r1") - r2 := r.Register("r2") - r3 := r.Register("r3") + r1 := r.RegisterComponent("r1") + r2 := r.RegisterComponent("r2") + r3 := r.RegisterComponent("r3") - r1.Update(Healthy) - r2.Update(Degraded) - r3.Update(Healthy) + r1.Update(state.Healthy, "") + r2.Update(state.Degraded, "degraded") + r3.Update(state.Healthy, "") - assert.Equal(t, Degraded, r.Status()) + assert.Equal(t, Degraded, r.StatusCode()) assert.Equal(t, "degraded", r.StatusString()) }) t.Run("failed when one failed", func(t *testing.T) { r := NewController(l) - r1 := r.Register("r1") - r2 := r.Register("r2") - r3 := r.Register("r3") + r1 := r.RegisterComponent("r1") + r2 := r.RegisterComponent("r2") + r3 := r.RegisterComponent("r3") - r1.Update(Healthy) - r2.Update(Failed) - r3.Update(Healthy) + r1.Update(state.Healthy, "") + r2.Update(state.Failed, "failed") + r3.Update(state.Healthy, "") - assert.Equal(t, Failed, r.Status()) + assert.Equal(t, Failed, r.StatusCode()) assert.Equal(t, "error", r.StatusString()) }) t.Run("failed when one failed and one degraded", func(t *testing.T) { r := NewController(l) - r1 := r.Register("r1") - r2 := r.Register("r2") - r3 := r.Register("r3") + r1 := r.RegisterComponent("r1") + r2 := r.RegisterComponent("r2") + r3 := r.RegisterComponent("r3") - r1.Update(Healthy) - r2.Update(Failed) - r3.Update(Degraded) + r1.Update(state.Healthy, "") + r2.Update(state.Failed, "failed") + r3.Update(state.Degraded, "degraded") - assert.Equal(t, Failed, r.Status()) + assert.Equal(t, Failed, r.StatusCode()) assert.Equal(t, "error", r.StatusString()) }) t.Run("degraded when degraded and healthy, failed unregistered", func(t *testing.T) { r := NewController(l) - r1 := r.Register("r1") - r2 := r.Register("r2") - r3 := r.Register("r3") + r1 := r.RegisterComponent("r1") + r2 := r.RegisterComponent("r2") + r3 := r.RegisterComponent("r3") - r1.Update(Healthy) - r2.Update(Failed) - r3.Update(Degraded) + r1.Update(state.Healthy, "") + r2.Update(state.Failed, "failed") + r3.Update(state.Degraded, "degraded") r2.Unregister() - assert.Equal(t, Degraded, r.Status()) + assert.Equal(t, Degraded, r.StatusCode()) assert.Equal(t, "degraded", r.StatusString()) }) } diff --git a/x-pack/elastic-agent/pkg/reporter/reporter.go b/x-pack/elastic-agent/pkg/reporter/reporter.go index 3b128841b2a..b1568e9f3f1 100644 --- a/x-pack/elastic-agent/pkg/reporter/reporter.go +++ b/x-pack/elastic-agent/pkg/reporter/reporter.go @@ -108,7 +108,7 @@ func generateRecord(agentID string, id string, name string, s state.State) event case state.Configuring: subType = EventSubTypeConfig subTypeText = EventSubTypeConfig - case state.Running: + case state.Healthy: subType = EventSubTypeRunning subTypeText = EventSubTypeRunning case state.Degraded: diff --git a/x-pack/elastic-agent/pkg/reporter/reporter_test.go b/x-pack/elastic-agent/pkg/reporter/reporter_test.go index ace35f0550b..8b0c095f654 100644 --- a/x-pack/elastic-agent/pkg/reporter/reporter_test.go +++ b/x-pack/elastic-agent/pkg/reporter/reporter_test.go @@ -62,11 +62,11 @@ func TestTypes(t *testing.T) { EventMessage: "Application: a-configuring[id]: State changed to CONFIG: Configuring", }, { - Status: state.Running, - StatusMessage: "Running", + Status: state.Healthy, + StatusMessage: "Healthy", EventType: EventTypeState, EventSubType: EventSubTypeRunning, - EventMessage: "Application: a-running[id]: State changed to RUNNING: Running", + EventMessage: "Application: a-healthy[id]: State changed to RUNNING: Healthy", }, { Status: state.Degraded, diff --git a/x-pack/elastic-agent/spec/fleet-server.yml b/x-pack/elastic-agent/spec/fleet-server.yml index 1c444f8990e..167bd9f8305 100644 --- a/x-pack/elastic-agent/spec/fleet-server.yml +++ b/x-pack/elastic-agent/spec/fleet-server.yml @@ -24,6 +24,10 @@ rules: selectors: [ fleet.server.output.elasticsearch ] path: output + - select_into: + selectors: [ fleet.server.policy.id ] + path: inputs.0.policy + - map: path: fleet rules: