diff --git a/.gitignore b/.gitignore index f990c862f53..3d7617dc217 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ *beat/build *beat/logs *beat/data +x-pack/functionbeat/pkg # Files .DS_Store @@ -21,6 +22,8 @@ coverage.out beat.db *.keystore mage_output_file.go +x-pack/functionbeat/*/fields.yml +x-pack/functionbeat/provider/*/functionbeat-* # Editor swap files *.swp diff --git a/dev-tools/mage/build.go b/dev-tools/mage/build.go index a227a473504..0c4b84657d5 100644 --- a/dev-tools/mage/build.go +++ b/dev-tools/mage/build.go @@ -33,6 +33,7 @@ import ( // "go build" is invoked. type BuildArgs struct { Name string // Name of binary. (On Windows '.exe' is appended.) + InputFiles []string OutputDir string CGO bool Static bool @@ -143,6 +144,10 @@ func Build(params BuildArgs) error { args = append(args, MustExpand(strings.Join(ldflags, " "))) } + if len(params.InputFiles) > 0 { + args = append(args, params.InputFiles...) + } + log.Println("Adding build environment vars:", env) return sh.RunWith(env, "go", args...) } diff --git a/dev-tools/mage/gotest.go b/dev-tools/mage/gotest.go index 2de7aa8dbae..b0b65142f10 100644 --- a/dev-tools/mage/gotest.go +++ b/dev-tools/mage/gotest.go @@ -53,6 +53,12 @@ type GoTestArgs struct { CoverageProfileFile string // Test coverage profile file (enables -cover). } +// TestBinaryArgs are the arguments used when building binary for testing. +type TestBinaryArgs struct { + Name string // Name of the binary to build + InputFiles []string +} + func makeGoTestArgs(name string) GoTestArgs { fileName := fmt.Sprintf("build/TEST-go-%s", strings.Replace(strings.ToLower(name), " ", "_", -1)) params := GoTestArgs{ @@ -80,6 +86,14 @@ func DefaultGoTestIntegrationArgs() GoTestArgs { return args } +// DefaultTestBinaryArgs returns the default arguments for building +// a binary for testing. +func DefaultTestBinaryArgs() TestBinaryArgs { + return TestBinaryArgs{ + Name: BeatName, + } +} + // GoTest invokes "go test" and reports the results to stdout. It returns an // error if there was any failure executing the tests or if there were any // test failures. @@ -329,15 +343,24 @@ func (s *GoTestSummary) String() string { return strings.TrimRight(b.String(), "\n") } -// BuildSystemTestBinary build a binary for testing that is instrumented for +// BuildSystemTestBinary runs BuildSystemTestGoBinary with default values. +func BuildSystemTestBinary() error { + return BuildSystemTestGoBinary(DefaultTestBinaryArgs()) +} + +// BuildSystemTestGoBinary build a binary for testing that is instrumented for // testing and measuring code coverage. The binary is only instrumented for // coverage when TEST_COVERAGE=true (default is false). -func BuildSystemTestBinary() error { +func BuildSystemTestGoBinary(binArgs TestBinaryArgs) error { args := []string{ "test", "-c", + "-o", binArgs.Name + ".test", } if TestCoverage { args = append(args, "-coverpkg", "./...") } + if len(binArgs.InputFiles) > 0 { + args = append(args, binArgs.InputFiles...) + } return sh.RunV("go", args...) } diff --git a/dev-tools/mage/pkgtypes.go b/dev-tools/mage/pkgtypes.go index 13e73bffb20..87ac75c8467 100644 --- a/dev-tools/mage/pkgtypes.go +++ b/dev-tools/mage/pkgtypes.go @@ -280,6 +280,10 @@ func (s PackageSpec) Clone() PackageSpec { for k, v := range s.Files { clone.Files[k] = v } + clone.ExtraVars = make(map[string]string, len(s.ExtraVars)) + for k, v := range s.ExtraVars { + clone.ExtraVars[k] = v + } return clone } @@ -343,6 +347,10 @@ func (s PackageSpec) Evaluate(args ...map[string]interface{}) PackageSpec { return MustExpand(in, args...) } + if s.evalContext == nil { + s.evalContext = map[string]interface{}{} + } + for k, v := range s.ExtraVars { s.evalContext[k] = mustExpand(v) } @@ -375,9 +383,6 @@ func (s PackageSpec) Evaluate(args ...map[string]interface{}) PackageSpec { } else { s.packageDir = filepath.Clean(mustExpand(s.packageDir)) } - if s.evalContext == nil { - s.evalContext = map[string]interface{}{} - } s.evalContext["PackageDir"] = s.packageDir evaluatedFiles := make(map[string]PackageFile, len(s.Files)) diff --git a/dev-tools/make/xpack.mk b/dev-tools/make/xpack.mk index 2930d27c0af..14fe27d165e 100644 --- a/dev-tools/make/xpack.mk +++ b/dev-tools/make/xpack.mk @@ -24,6 +24,8 @@ check: mage clean: mage mage clean +fix-permissions: + .PHONY: fmt fmt: mage mage fmt @@ -38,6 +40,8 @@ help: release: mage mage package +stop-environment: + .PHONY: testsuite testsuite: mage -rm build/TEST-go-integration.out diff --git a/libbeat/cmd/root.go b/libbeat/cmd/root.go index e13bd2a7e0e..e76a7ae9d90 100644 --- a/libbeat/cmd/root.go +++ b/libbeat/cmd/root.go @@ -76,7 +76,7 @@ func GenRootCmdWithSettings(beatCreator beat.Creator, settings instance.Settings rootCmd.TestCmd = genTestCmd(settings, beatCreator) rootCmd.SetupCmd = genSetupCmd(settings, beatCreator) rootCmd.KeystoreCmd = genKeystoreCmd(settings) - rootCmd.VersionCmd = genVersionCmd(settings) + rootCmd.VersionCmd = GenVersionCmd(settings) rootCmd.CompletionCmd = genCompletionCmd(settings, rootCmd) // Root command is an alias for run diff --git a/libbeat/cmd/version.go b/libbeat/cmd/version.go index 5660e627268..6145c52e707 100644 --- a/libbeat/cmd/version.go +++ b/libbeat/cmd/version.go @@ -28,7 +28,8 @@ import ( "github.com/elastic/beats/libbeat/version" ) -func genVersionCmd(settings instance.Settings) *cobra.Command { +// GenVersionCmd generates the command version for a Beat. +func GenVersionCmd(settings instance.Settings) *cobra.Command { return &cobra.Command{ Use: "version", Short: "Show current version info", diff --git a/x-pack/functionbeat/Makefile b/x-pack/functionbeat/Makefile index 876f451476c..fc92d913b7e 100644 --- a/x-pack/functionbeat/Makefile +++ b/x-pack/functionbeat/Makefile @@ -1,22 +1,11 @@ -BEAT_NAME?=functionbeat -LICENSE=Elastic -BEAT_TITLE?=Functionbeat -SYSTEM_TESTS?=true -BEAT_PATH?=github.com/elastic/beats/x-pack/${BEAT_NAME} -TEST_ENVIRONMENT?=true +# +# Variables +# GOX_FLAGS=-arch="amd64 386 arm ppc64 ppc64le" ES_BEATS?=../../ -FIELDS_FILE_PATH=module -XPACK_ONLY?=true -# Path to the libbeat Makefile -include $(ES_BEATS)/libbeat/scripts/Makefile +# +# Includes +# +include $(ES_BEATS)/dev-tools/make/xpack.mk -# Runs all collection steps and updates afterwards -.PHONY: collect -collect: - -# Generate an artifact to be push on serverless provider. -.PHONY: linux -linux: - GOOS=linux go build -o pkg/functionbeat diff --git a/x-pack/functionbeat/config/config.go b/x-pack/functionbeat/config/config.go index d1d2f66d91c..7ef3055c2c0 100644 --- a/x-pack/functionbeat/config/config.go +++ b/x-pack/functionbeat/config/config.go @@ -14,29 +14,28 @@ import ( "github.com/elastic/beats/libbeat/common" ) +// ConfigOverrides overrides the defaults provided by libbeat. var ( functionPattern = "^[A-Za-z][A-Za-z0-9\\-]{0,139}$" functionRE = regexp.MustCompile(functionPattern) + ConfigOverrides = common.MustNewConfigFrom(map[string]interface{}{ + "path.data": "/tmp", + "path.logs": "/tmp/logs", + "logging.to_stderr": true, + "logging.to_files": false, + "setup.template.enabled": true, + "queue.mem": map[string]interface{}{ + "events": "${output.elasticsearch.bulk_max_size}", + "flush.min_events": 10, + "flush.timeout": "0.01s", + }, + "output.elasticsearch.bulk_max_size": 50, + }) ) -// ConfigOverrides overrides the defaults provided by libbeat. -var ConfigOverrides = common.MustNewConfigFrom(map[string]interface{}{ - "path.data": "/tmp", - "path.logs": "/tmp/logs", - "logging.to_stderr": true, - "logging.to_files": false, - "setup.template.enabled": true, - "queue.mem": map[string]interface{}{ - "events": "${output.elasticsearch.bulk_max_size}", - "flush.min_events": 10, - "flush.timeout": "0.01s", - }, - "output.elasticsearch.bulk_max_size": 50, -}) - // Config default configuration for Functionbeat. type Config struct { - Provider *common.ConfigNamespace `config:"provider" validate:"required"` + Provider *common.Config `config:"provider" validate:"required"` } // ProviderConfig is a generic configured used by providers. diff --git a/x-pack/functionbeat/config/config_test.go b/x-pack/functionbeat/config/config_test.go index fb616b9af10..472e2aecf7e 100644 --- a/x-pack/functionbeat/config/config_test.go +++ b/x-pack/functionbeat/config/config_test.go @@ -2,6 +2,9 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. +// Config is put into a different package to prevent cyclic imports in case +// it is needed in several locations + package config import ( diff --git a/x-pack/functionbeat/dev-tools/packaging/packages.yml b/x-pack/functionbeat/dev-tools/packaging/packages.yml index 2a844056262..882a03ddf34 100644 --- a/x-pack/functionbeat/dev-tools/packaging/packages.yml +++ b/x-pack/functionbeat/dev-tools/packaging/packages.yml @@ -41,9 +41,6 @@ shared: source: '{{.BeatName}}.yml' mode: 0600 config: true - kibana: - source: _meta/kibana.generated - mode: 0644 # Binary package spec (tar.gz for linux/darwin) - &binary_spec @@ -61,12 +58,12 @@ shared: source: '{{ repo.RootDir }}/licenses/ELASTIC-LICENSE.txt' mode: 0644 # - # Binaries used to run the function. + # Binaries used to run functions. # - &functionbeat_binaries files: - pkg/functionbeat: - source: 'build/golang-crossbuild/{{.BeatName}}-linux-amd64' + pkg/functionbeat-aws: + source: 'provider/aws/build/golang-crossbuild/aws-linux-amd64' mode: 0755 # specs is a list of named packaging "flavors". specs: diff --git a/x-pack/functionbeat/docker-compose.yml b/x-pack/functionbeat/docker-compose.yml index e9a81116b4d..e49a7cdac29 100644 --- a/x-pack/functionbeat/docker-compose.yml +++ b/x-pack/functionbeat/docker-compose.yml @@ -4,8 +4,6 @@ services: build: ${PWD}/. depends_on: - proxy_dep - env_file: - - ${PWD}/build/test.env working_dir: /go/src/github.com/elastic/beats/x-pack/functionbeat volumes: - ${PWD}/../..:/go/src/github.com/elastic/beats/ diff --git a/x-pack/functionbeat/beater/functionbeat.go b/x-pack/functionbeat/function/beater/functionbeat.go similarity index 88% rename from x-pack/functionbeat/beater/functionbeat.go rename to x-pack/functionbeat/function/beater/functionbeat.go index db0965e2834..34b91753db8 100644 --- a/x-pack/functionbeat/beater/functionbeat.go +++ b/x-pack/functionbeat/function/beater/functionbeat.go @@ -18,9 +18,8 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/x-pack/functionbeat/config" - "github.com/elastic/beats/x-pack/functionbeat/core" - _ "github.com/elastic/beats/x-pack/functionbeat/include" // imports features - "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/functionbeat/function/core" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" "github.com/elastic/beats/x-pack/libbeat/licenser" ) @@ -52,7 +51,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { return nil, fmt.Errorf("error reading config file: %+v", err) } - provider, err := provider.NewProvider(c) + provider, err := getProvider(c.Provider) if err != nil { return nil, err } @@ -68,6 +67,23 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { return bt, nil } +func getProvider(cfg *common.Config) (provider.Provider, error) { + providers, err := provider.List() + if err != nil { + return nil, err + } + if len(providers) != 1 { + return nil, fmt.Errorf("too many providers are available, expected one, got: %s", providers) + } + + providerCfg, err := cfg.Child(providers[0], -1) + if err != nil { + return nil, err + } + + return provider.NewProvider(providers[0], providerCfg) +} + // Run starts functionbeat. func (bt *Functionbeat) Run(b *beat.Beat) error { defer bt.cancel() @@ -108,6 +124,7 @@ func (bt *Functionbeat) Run(b *beat.Beat) error { return nil } +// enabledFunctions returns the enabled function types func (bt *Functionbeat) enabledFunctions() (values []string) { raw, found := os.LookupEnv("ENABLED_FUNCTIONS") if !found { diff --git a/x-pack/functionbeat/function/cmd/root.go b/x-pack/functionbeat/function/cmd/root.go new file mode 100644 index 00000000000..554cecd1668 --- /dev/null +++ b/x-pack/functionbeat/function/cmd/root.go @@ -0,0 +1,54 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/cfgfile" + "github.com/elastic/beats/libbeat/cmd" + "github.com/elastic/beats/libbeat/cmd/instance" + "github.com/elastic/beats/x-pack/functionbeat/config" +) + +// FunctionCmd is the command of the function. +type FunctionCmd struct { + *cobra.Command + VersionCmd *cobra.Command +} + +// NewFunctionCmd return a new initialized function command. +func NewFunctionCmd(name string, beatCreator beat.Creator) *FunctionCmd { + settings := instance.Settings{ + Name: name, + IndexPrefix: name, + ConfigOverrides: config.ConfigOverrides, + } + + err := cfgfile.ChangeDefaultCfgfileFlag(settings.Name) + if err != nil { + panic(fmt.Errorf("failed to set default config file path: %v", err)) + } + + rootCmd := &FunctionCmd{ + &cobra.Command{ + Run: func(cmd *cobra.Command, args []string) { + err := instance.Run(settings, beatCreator) + if err != nil { + os.Exit(1) + } + }, + }, + cmd.GenVersionCmd(settings), + } + + rootCmd.AddCommand(rootCmd.VersionCmd) + + return rootCmd +} diff --git a/x-pack/functionbeat/core/coordinator.go b/x-pack/functionbeat/function/core/coordinator.go similarity index 100% rename from x-pack/functionbeat/core/coordinator.go rename to x-pack/functionbeat/function/core/coordinator.go diff --git a/x-pack/functionbeat/core/coordinator_test.go b/x-pack/functionbeat/function/core/coordinator_test.go similarity index 100% rename from x-pack/functionbeat/core/coordinator_test.go rename to x-pack/functionbeat/function/core/coordinator_test.go diff --git a/x-pack/functionbeat/core/license_client.go b/x-pack/functionbeat/function/core/license_client.go similarity index 100% rename from x-pack/functionbeat/core/license_client.go rename to x-pack/functionbeat/function/core/license_client.go diff --git a/x-pack/functionbeat/core/license_client_test.go b/x-pack/functionbeat/function/core/license_client_test.go similarity index 100% rename from x-pack/functionbeat/core/license_client_test.go rename to x-pack/functionbeat/function/core/license_client_test.go diff --git a/x-pack/functionbeat/core/sync_client.go b/x-pack/functionbeat/function/core/sync_client.go similarity index 100% rename from x-pack/functionbeat/core/sync_client.go rename to x-pack/functionbeat/function/core/sync_client.go diff --git a/x-pack/functionbeat/core/sync_client_test.go b/x-pack/functionbeat/function/core/sync_client_test.go similarity index 100% rename from x-pack/functionbeat/core/sync_client_test.go rename to x-pack/functionbeat/function/core/sync_client_test.go diff --git a/x-pack/functionbeat/provider/cli.go b/x-pack/functionbeat/function/provider/cli.go similarity index 100% rename from x-pack/functionbeat/provider/cli.go rename to x-pack/functionbeat/function/provider/cli.go diff --git a/x-pack/functionbeat/provider/default_provider.go b/x-pack/functionbeat/function/provider/default_provider.go similarity index 92% rename from x-pack/functionbeat/provider/default_provider.go rename to x-pack/functionbeat/function/provider/default_provider.go index 413e44fe24b..155565d3507 100644 --- a/x-pack/functionbeat/provider/default_provider.go +++ b/x-pack/functionbeat/function/provider/default_provider.go @@ -10,7 +10,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/x-pack/functionbeat/config" - "github.com/elastic/beats/x-pack/functionbeat/core" + "github.com/elastic/beats/x-pack/functionbeat/function/core" ) // DefaultProvider implements the minimal required to retrieve and start functions. @@ -79,6 +79,11 @@ func (d *DefaultProvider) TemplateBuilder() (TemplateBuilder, error) { return d.templateFactory(d.log, d.rawConfig, d) } +// EnabledFunctions return the list of enabled funcionts. +func (d *DefaultProvider) EnabledFunctions() ([]string, error) { + return EnabledFunctions(d.registry, d, d.config.Functions) +} + // nullCLI is used when a provider doesn't implement the CLI to manager functions on the service provider. type nullCLI struct{} @@ -99,6 +104,7 @@ func NewNullTemplateBuilder(_ *logp.Logger, _ *common.Config, _ Provider) (Templ return (*nullTemplateBuilder)(nil), nil } +// RawTemplate returns a empty string. func (*nullTemplateBuilder) RawTemplate(_ string) (string, error) { return "", fmt.Errorf("raw temaplate not implemented") } diff --git a/x-pack/functionbeat/provider/feature.go b/x-pack/functionbeat/function/provider/feature.go similarity index 97% rename from x-pack/functionbeat/provider/feature.go rename to x-pack/functionbeat/function/provider/feature.go index 59990d7a039..b5e56389fd2 100644 --- a/x-pack/functionbeat/provider/feature.go +++ b/x-pack/functionbeat/function/provider/feature.go @@ -4,7 +4,9 @@ package provider -import "github.com/elastic/beats/libbeat/feature" +import ( + "github.com/elastic/beats/libbeat/feature" +) // getNamespace return the namespace for functions of a specific provider. The registry have a flat view // representation of the plugin world this mean we don't really have a tree, instead what we do is diff --git a/x-pack/functionbeat/provider/feature_test.go b/x-pack/functionbeat/function/provider/feature_test.go similarity index 100% rename from x-pack/functionbeat/provider/feature_test.go rename to x-pack/functionbeat/function/provider/feature_test.go diff --git a/x-pack/functionbeat/provider/provider.go b/x-pack/functionbeat/function/provider/provider.go similarity index 73% rename from x-pack/functionbeat/provider/provider.go rename to x-pack/functionbeat/function/provider/provider.go index 2a7c6d013aa..43d08c9180c 100644 --- a/x-pack/functionbeat/provider/provider.go +++ b/x-pack/functionbeat/function/provider/provider.go @@ -13,8 +13,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/feature" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/config" - "github.com/elastic/beats/x-pack/functionbeat/core" + "github.com/elastic/beats/x-pack/functionbeat/function/core" ) // Create a new pipeline client based on the function configuration. @@ -31,6 +30,7 @@ type Function interface { type Provider interface { CreateFunctions(clientFactory, []string) ([]core.Runner, error) FindFunctionByName(string) (Function, error) + EnabledFunctions() ([]string, error) CLIManager() (CLIManager, error) TemplateBuilder() (TemplateBuilder, error) Name() string @@ -60,19 +60,42 @@ func (r *Runnable) String() string { } // NewProvider return the provider specified in the configuration or an error. -func NewProvider(cfg *config.Config) (Provider, error) { +func NewProvider(name string, cfg *common.Config) (Provider, error) { // Configure the provider, the provider will take care of the configuration for the // functions. registry := NewRegistry(feature.GlobalRegistry()) - providerFunc, err := registry.Lookup(cfg.Provider.Name()) + providerFunc, err := registry.Lookup(name) if err != nil { - return nil, fmt.Errorf("error finding the provider '%s', error: %v", cfg.Provider.Name(), err) + return nil, fmt.Errorf("error finding the provider '%s', error: %v", name, err) } - provider, err := providerFunc(logp.NewLogger("provider"), registry, cfg.Provider.Config()) + provider, err := providerFunc(logp.NewLogger("provider"), registry, cfg) if err != nil { - return nil, fmt.Errorf("error creating the provider '%s', error: %v", cfg.Provider.Name(), err) + return nil, fmt.Errorf("error creating the provider '%s', error: %v", name, err) } return provider, nil } + +// IsAvailable checks if a cloud provider is available in the binary. +func IsAvailable(name string) (bool, error) { + registry := NewRegistry(feature.GlobalRegistry()) + + availableProviders, err := registry.AvailableProviders() + if err != nil { + return false, err + } + + for _, p := range availableProviders { + if p == name { + return true, nil + } + } + return false, nil +} + +// List returns the list of available providers. +func List() ([]string, error) { + registry := NewRegistry(feature.GlobalRegistry()) + return registry.AvailableProviders() +} diff --git a/x-pack/functionbeat/provider/provider_test.go b/x-pack/functionbeat/function/provider/provider_test.go similarity index 97% rename from x-pack/functionbeat/provider/provider_test.go rename to x-pack/functionbeat/function/provider/provider_test.go index 3e9c9c7a4eb..3fc3aa2a866 100644 --- a/x-pack/functionbeat/provider/provider_test.go +++ b/x-pack/functionbeat/function/provider/provider_test.go @@ -14,7 +14,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/x-pack/functionbeat/core" + "github.com/elastic/beats/x-pack/functionbeat/function/core" ) type simpleFunction struct { diff --git a/x-pack/functionbeat/provider/registry.go b/x-pack/functionbeat/function/provider/registry.go similarity index 84% rename from x-pack/functionbeat/provider/registry.go rename to x-pack/functionbeat/function/provider/registry.go index b36809bcf72..403cbea893e 100644 --- a/x-pack/functionbeat/provider/registry.go +++ b/x-pack/functionbeat/function/provider/registry.go @@ -12,7 +12,7 @@ import ( "github.com/elastic/beats/libbeat/feature" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/x-pack/functionbeat/config" - "github.com/elastic/beats/x-pack/functionbeat/core" + "github.com/elastic/beats/x-pack/functionbeat/function/core" ) // Errors generated by the registry when we are retrieving providers or functions from the main registry. @@ -92,6 +92,21 @@ func (r *Registry) LookupFunction(provider, function string) (FunctionFactory, e return fn, nil } +// AvailableProviders returns the names of registered providers. +func (r *Registry) AvailableProviders() ([]string, error) { + providerFeatures, err := r.registry.LookupAll(namespace) + if err != nil { + return nil, err + } + + var providers []string + for _, f := range providerFeatures { + providers = append(providers, f.Name()) + } + + return providers, nil +} + // CreateFunctions create runnable function based on the configurations received. func CreateFunctions( registry *Registry, @@ -182,3 +197,21 @@ func FindFunctionByName( return nil, fmt.Errorf("no function with name '%s' exists", name) } + +// EnabledFunctions returns the list of enabled functions. +func EnabledFunctions(registry *Registry, provider Provider, configs []*common.Config) ([]string, error) { + var names []string + for _, cfg := range configs { + c := config.FunctionConfig{} + err := cfg.Unpack(&c) + if err != nil { + return names, fmt.Errorf("error while finding enabled functions: %+v", err) + } + + if c.Enabled { + names = append(names, c.Name.String()) + } + } + + return names, nil +} diff --git a/x-pack/functionbeat/provider/registry_test.go b/x-pack/functionbeat/function/provider/registry_test.go similarity index 97% rename from x-pack/functionbeat/provider/registry_test.go rename to x-pack/functionbeat/function/provider/registry_test.go index 847214c4a58..ad4b747232a 100644 --- a/x-pack/functionbeat/provider/registry_test.go +++ b/x-pack/functionbeat/function/provider/registry_test.go @@ -15,7 +15,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/feature" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/core" + "github.com/elastic/beats/x-pack/functionbeat/function/core" ) type mockProvider struct { @@ -37,6 +37,8 @@ func (m *mockProvider) CLIManager() (CLIManager, error) { return nil, nil } func (m *mockProvider) TemplateBuilder() (TemplateBuilder, error) { return nil, nil } +func (m *mockProvider) EnabledFunctions() ([]string, error) { return []string{}, nil } + func TestRegistry(t *testing.T) { t.Run("provider", testProviderLookup) t.Run("functions", testFunctionLookup) diff --git a/x-pack/functionbeat/provider/template.go b/x-pack/functionbeat/function/provider/template.go similarity index 100% rename from x-pack/functionbeat/provider/template.go rename to x-pack/functionbeat/function/provider/template.go diff --git a/x-pack/functionbeat/include/feature.go b/x-pack/functionbeat/include/feature.go index 1a9ea1b80b6..7e73f7e950e 100644 --- a/x-pack/functionbeat/include/feature.go +++ b/x-pack/functionbeat/include/feature.go @@ -6,8 +6,8 @@ package include import ( "github.com/elastic/beats/libbeat/feature" - "github.com/elastic/beats/x-pack/functionbeat/provider/aws" - "github.com/elastic/beats/x-pack/functionbeat/provider/local" + "github.com/elastic/beats/x-pack/functionbeat/manager/aws" + "github.com/elastic/beats/x-pack/functionbeat/provider/local/local" ) // Bundle feature enabled. diff --git a/x-pack/functionbeat/magefile.go b/x-pack/functionbeat/magefile.go index 76ee6476251..91352c229ee 100644 --- a/x-pack/functionbeat/magefile.go +++ b/x-pack/functionbeat/magefile.go @@ -7,25 +7,55 @@ package main import ( - "context" "fmt" + "path/filepath" "time" "github.com/magefile/mage/mg" - "github.com/magefile/mage/sh" + + // mage:import + _ "github.com/elastic/beats/dev-tools/mage/target/common" + "github.com/elastic/beats/dev-tools/mage/target/unittest" + // mage:import + _ "github.com/elastic/beats/dev-tools/mage/target/pkg" + // mage:import + _ "github.com/elastic/beats/dev-tools/mage/target/integtest" + // mage:import + _ "github.com/elastic/beats/dev-tools/mage/target/unittest" devtools "github.com/elastic/beats/dev-tools/mage" functionbeat "github.com/elastic/beats/x-pack/functionbeat/scripts/mage" ) +var () + func init() { devtools.BeatDescription = "Functionbeat is a beat implementation for a serverless architecture." devtools.BeatLicense = "Elastic License" } -// Build builds the Beat binary. +// Build builds the Beat binary and functions by provider. func Build() error { - return devtools.Build(devtools.DefaultBuildArgs()) + params := devtools.DefaultBuildArgs() + + // Building functionbeat manager + err := devtools.Build(params) + if err != nil { + return err + } + + // Building functions to deploy + for _, provider := range functionbeat.SelectedProviders { + inputFiles := filepath.Join("provider", provider, "main.go") + params.InputFiles = []string{inputFiles} + params.Name = devtools.BeatName + "-" + provider + params.OutputDir = filepath.Join("provider", provider) + err := devtools.Build(params) + if err != nil { + return err + } + } + return nil } // GolangCrossBuild build the Beat binary inside of the golang-builder. @@ -41,7 +71,20 @@ func BuildGoDaemon() error { // CrossBuild cross-builds the beat for all target platforms. func CrossBuild() error { - return devtools.CrossBuild(devtools.AddPlatforms("linux/amd64")) + // Building functionbeat manager + err := devtools.CrossBuild() + if err != nil { + return err + } + + // Building functions to deploy + for _, provider := range functionbeat.SelectedProviders { + err := devtools.CrossBuild(devtools.AddPlatforms("linux/amd64"), devtools.InDir("x-pack", "functionbeat", "provider", provider)) + if err != nil { + return err + } + } + return nil } // CrossBuildGoDaemon cross-builds the go-daemon binary using Docker. @@ -49,10 +92,17 @@ func CrossBuildGoDaemon() error { return devtools.CrossBuildGoDaemon() } -// Clean cleans all generated files and build artifacts. -func Clean() error { - return devtools.Clean() -} +// Update is an alias for update:all. This is a workaround for +// https://github.com/magefile/mage/issues/217. +func Update() { mg.Deps(functionbeat.Update.All) } + +// Fields is an alias for update:fields. This is a workaround for +// https://github.com/magefile/mage/issues/217. +func Fields() { mg.Deps(functionbeat.Update.Fields) } + +// Config is an alias for update:config. This is a workaround for +// https://github.com/magefile/mage/issues/217. +func Config() { mg.Deps(functionbeat.Update.Config) } // Package packages the Beat for distribution. // Use SNAPSHOT=true to build snapshots. @@ -73,31 +123,32 @@ func TestPackages() error { return devtools.TestPackages() } -// Update updates the generated files (aka make update). -func Update() error { - return sh.Run("make", "update") -} - -// Fields generates a fields.yml for the Beat. -func Fields() error { - return devtools.GenerateFieldsYAML() -} - -// GoTestUnit executes the Go unit tests. -// Use TEST_COVERAGE=true to enable code coverage profiling. -// Use RACE_DETECTOR=true to enable the race detector. -func GoTestUnit(ctx context.Context) error { - return devtools.GoTest(ctx, devtools.DefaultGoTestUnitArgs()) -} - -// GoTestIntegration executes the Go integration tests. -// Use TEST_COVERAGE=true to enable code coverage profiling. -// Use RACE_DETECTOR=true to enable the race detector. -func GoTestIntegration(ctx context.Context) error { - return devtools.GoTest(ctx, devtools.DefaultGoTestIntegrationArgs()) +// GoTestUnit is an alias for goUnitTest. +func GoTestUnit() { + mg.Deps(unittest.GoUnitTest) } -// Config generates both the short and reference configs. -func Config() error { - return devtools.Config(devtools.ShortConfigType|devtools.ReferenceConfigType, functionbeat.XPackConfigFileParams(), ".") +// BuildSystemTestBinary build a binary for testing that is instrumented for +// testing and measuring code coverage. The binary is only instrumented for +// coverage when TEST_COVERAGE=true (default is false). +func BuildSystemTestBinary() error { + err := devtools.BuildSystemTestBinary() + if err != nil { + return err + } + + params := devtools.DefaultTestBinaryArgs() + for _, provider := range functionbeat.SelectedProviders { + params.Name = filepath.Join("provider", provider, devtools.BeatName+"-"+provider) + inputFiles := make([]string, 0) + for _, inputFileName := range []string{"main.go", "main_test.go"} { + inputFiles = append(inputFiles, filepath.Join("provider", provider, inputFileName)) + } + params.InputFiles = inputFiles + err := devtools.BuildSystemTestGoBinary(params) + if err != nil { + return err + } + } + return nil } diff --git a/x-pack/functionbeat/main.go b/x-pack/functionbeat/main.go index 7ab1bfdf226..1a1f8eecd67 100644 --- a/x-pack/functionbeat/main.go +++ b/x-pack/functionbeat/main.go @@ -7,7 +7,8 @@ package main import ( "os" - "github.com/elastic/beats/x-pack/functionbeat/cmd" + _ "github.com/elastic/beats/x-pack/functionbeat/include" // imports features + "github.com/elastic/beats/x-pack/functionbeat/manager/cmd" ) func main() { diff --git a/x-pack/functionbeat/main_test.go b/x-pack/functionbeat/main_test.go index 0475e667da8..1d1c2c79a21 100644 --- a/x-pack/functionbeat/main_test.go +++ b/x-pack/functionbeat/main_test.go @@ -10,7 +10,7 @@ import ( "flag" "testing" - "github.com/elastic/beats/x-pack/functionbeat/cmd" + "github.com/elastic/beats/x-pack/functionbeat/manager/cmd" ) var systemTest *bool diff --git a/x-pack/functionbeat/provider/aws/aws.go b/x-pack/functionbeat/manager/aws/aws.go similarity index 55% rename from x-pack/functionbeat/provider/aws/aws.go rename to x-pack/functionbeat/manager/aws/aws.go index 32d412690ec..e5dd717ae5f 100644 --- a/x-pack/functionbeat/provider/aws/aws.go +++ b/x-pack/functionbeat/manager/aws/aws.go @@ -6,7 +6,8 @@ package aws import ( "github.com/elastic/beats/libbeat/feature" - "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" + "github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws" ) // Bundle exposes the trigger supported by the AWS provider. @@ -15,31 +16,15 @@ var Bundle = provider.MustCreate( provider.NewDefaultProvider("aws", NewCLI, NewTemplateBuilder), feature.NewDetails("AWS Lambda", "listen to events on AWS lambda", feature.Stable), ).MustAddFunction("cloudwatch_logs", - NewCloudwatchLogs, - feature.NewDetails( - "Cloudwatch Logs trigger", - "receive events from cloudwatch logs.", - feature.Stable, - ), + aws.NewCloudwatchLogs, + aws.CloudwatchLogsDetails(), ).MustAddFunction("api_gateway_proxy", - NewAPIGatewayProxy, - feature.NewDetails( - "API Gateway proxy trigger", - "receive events from the api gateway proxy", - feature.Experimental, - ), + aws.NewAPIGatewayProxy, + aws.APIGatewayProxyDetails(), ).MustAddFunction("kinesis", - NewKinesis, - feature.NewDetails( - "Kinesis trigger", - "receive events from a Kinesis stream", - feature.Stable, - ), + aws.NewKinesis, + aws.KinesisDetails(), ).MustAddFunction("sqs", - NewSQS, - feature.NewDetails( - "SQS trigger", - "receive events from a SQS queue", - feature.Stable, - ), + aws.NewSQS, + aws.SQSDetails(), ).Bundle() diff --git a/x-pack/functionbeat/provider/aws/cli_manager.go b/x-pack/functionbeat/manager/aws/cli_manager.go similarity index 91% rename from x-pack/functionbeat/provider/aws/cli_manager.go rename to x-pack/functionbeat/manager/aws/cli_manager.go index 45ab67c76fa..eae02ff019e 100644 --- a/x-pack/functionbeat/provider/aws/cli_manager.go +++ b/x-pack/functionbeat/manager/aws/cli_manager.go @@ -6,7 +6,6 @@ package aws import ( "fmt" - "regexp" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/external" @@ -16,18 +15,17 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" + "github.com/elastic/beats/x-pack/functionbeat/manager/executor" + fnaws "github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws" ) const ( // AWS lambda currently support go 1.x as a runtime. runtime = "go1.x" - handlerName = "functionbeat" + handlerName = "functionbeat-aws" ) -// Chars for resource name anything else will be replaced. -var validChars = regexp.MustCompile("[^a-zA-Z0-9]") - // AWSLambdaFunction add 'dependsOn' as a serializable parameters, goformation doesn't currently // serialize this field. type AWSLambdaFunction struct { @@ -38,7 +36,7 @@ type AWSLambdaFunction struct { type installer interface { Policies() []cloudformation.AWSIAMRole_Policy Template() *cloudformation.Template - LambdaConfig() *lambdaConfig + LambdaConfig() *fnaws.LambdaConfig } // CLIManager interacts with the AWS Lambda API to deploy, update or remove a function. @@ -48,7 +46,7 @@ type CLIManager struct { templateBuilder *defaultTemplateBuilder awsCfg aws.Config log *logp.Logger - config *Config + config *fnaws.Config } // stackName cloudformation stack are unique per function. @@ -65,7 +63,7 @@ func (c *CLIManager) deployTemplate(update bool, name string) error { c.log.Debugf("Using cloudformation template:\n%s", templateData.json) svcCF := cf.New(c.awsCfg) - executer := newExecutor(c.log) + executer := executor.NewExecutor(c.log) executer.Add(newOpEnsureBucket(c.log, c.awsCfg, c.bucket())) executer.Add(newOpUploadToBucket( c.log, @@ -141,7 +139,7 @@ func (c *CLIManager) Remove(name string) error { defer c.log.Debugf("Removal of function '%s' complete", name) svc := cf.New(c.awsCfg) - executer := newExecutor(c.log) + executer := executor.NewExecutor(c.log) executer.Add(newOpDeleteCloudFormation(c.log, svc, c.stackName(name))) executer.Add(newWaitDeleteCloudFormation(c.log, c.awsCfg)) @@ -170,7 +168,7 @@ func NewCLI( return nil, err } - config := &Config{} + config := &fnaws.Config{} if err := cfg.Unpack(config); err != nil { return nil, err } @@ -192,7 +190,3 @@ func NewCLI( templateBuilder: templateBuilder, }, nil } - -func normalizeResourceName(s string) string { - return validChars.ReplaceAllString(s, "") -} diff --git a/x-pack/functionbeat/provider/aws/cli_manager_test.go b/x-pack/functionbeat/manager/aws/cli_manager_test.go similarity index 53% rename from x-pack/functionbeat/provider/aws/cli_manager_test.go rename to x-pack/functionbeat/manager/aws/cli_manager_test.go index ef7bd36c6b8..534971c4358 100644 --- a/x-pack/functionbeat/provider/aws/cli_manager_test.go +++ b/x-pack/functionbeat/manager/aws/cli_manager_test.go @@ -36,39 +36,3 @@ func TestChecksum(t *testing.T) { assert.NotEqual(t, checksum(content), checksum(other)) }) } - -func TestNormalize(t *testing.T) { - tests := []struct { - title string - candidate string - chars string - expected string - }{ - { - title: "when the string contains invalid chars", - candidate: "/var/log-alpha/tmp:ok", - expected: "varlogalphatmpok", - }, - { - title: "when we have an empty string", - candidate: "", - expected: "", - }, - { - title: "when we don't have any invalid chars", - candidate: "hello", - expected: "hello", - }, - { - title: "when the string contains underscore", - candidate: "/var/log-alpha/tmp:ok_moreok", - expected: "varlogalphatmpokmoreok", - }, - } - - for _, test := range tests { - t.Run(test.title, func(t *testing.T) { - assert.Equal(t, test.expected, normalizeResourceName(test.candidate)) - }) - } -} diff --git a/x-pack/functionbeat/provider/aws/event_stack_poller.go b/x-pack/functionbeat/manager/aws/event_stack_poller.go similarity index 99% rename from x-pack/functionbeat/provider/aws/event_stack_poller.go rename to x-pack/functionbeat/manager/aws/event_stack_poller.go index 3c57ca9d655..5813bc3a124 100644 --- a/x-pack/functionbeat/provider/aws/event_stack_poller.go +++ b/x-pack/functionbeat/manager/aws/event_stack_poller.go @@ -124,7 +124,7 @@ func (e *eventStackPoller) poll() { if nextToken == nil { return } - case <-time.After(periodicCheck): + case <-time.After(e.periodicCheck): } } } diff --git a/x-pack/functionbeat/provider/aws/event_stack_poller_test.go b/x-pack/functionbeat/manager/aws/event_stack_poller_test.go similarity index 100% rename from x-pack/functionbeat/provider/aws/event_stack_poller_test.go rename to x-pack/functionbeat/manager/aws/event_stack_poller_test.go diff --git a/x-pack/functionbeat/provider/aws/op_cloudformation.go b/x-pack/functionbeat/manager/aws/op_cloudformation.go similarity index 95% rename from x-pack/functionbeat/provider/aws/op_cloudformation.go rename to x-pack/functionbeat/manager/aws/op_cloudformation.go index 700a5d4c5f7..7103834ab9f 100644 --- a/x-pack/functionbeat/provider/aws/op_cloudformation.go +++ b/x-pack/functionbeat/manager/aws/op_cloudformation.go @@ -16,6 +16,7 @@ import ( "github.com/gofrs/uuid" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/functionbeat/manager/executor" ) type opCreateCloudFormation struct { @@ -38,7 +39,7 @@ func newOpCreateCloudFormation( } } -func (o *opCreateCloudFormation) Execute(ctx executionContext) error { +func (o *opCreateCloudFormation) Execute(ctx executor.Context) error { c, ok := ctx.(*stackContext) if !ok { return errWrongContext diff --git a/x-pack/functionbeat/provider/aws/op_cloudformation_test.go b/x-pack/functionbeat/manager/aws/op_cloudformation_test.go similarity index 100% rename from x-pack/functionbeat/provider/aws/op_cloudformation_test.go rename to x-pack/functionbeat/manager/aws/op_cloudformation_test.go diff --git a/x-pack/functionbeat/provider/aws/op_delete_cloudformation.go b/x-pack/functionbeat/manager/aws/op_delete_cloudformation.go similarity index 92% rename from x-pack/functionbeat/provider/aws/op_delete_cloudformation.go rename to x-pack/functionbeat/manager/aws/op_delete_cloudformation.go index b814960f6ef..3ee7c7c8682 100644 --- a/x-pack/functionbeat/provider/aws/op_delete_cloudformation.go +++ b/x-pack/functionbeat/manager/aws/op_delete_cloudformation.go @@ -13,6 +13,7 @@ import ( "github.com/gofrs/uuid" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/functionbeat/manager/executor" ) type opDeleteCloudFormation struct { @@ -21,7 +22,7 @@ type opDeleteCloudFormation struct { stackName string } -func (o *opDeleteCloudFormation) Execute(ctx executionContext) error { +func (o *opDeleteCloudFormation) Execute(ctx executor.Context) error { c, ok := ctx.(*stackContext) if !ok { return errWrongContext diff --git a/x-pack/functionbeat/provider/aws/op_delete_file_bucket.go b/x-pack/functionbeat/manager/aws/op_delete_file_bucket.go similarity index 90% rename from x-pack/functionbeat/provider/aws/op_delete_file_bucket.go rename to x-pack/functionbeat/manager/aws/op_delete_file_bucket.go index a97bf3a3402..38b00a93fb2 100644 --- a/x-pack/functionbeat/provider/aws/op_delete_file_bucket.go +++ b/x-pack/functionbeat/manager/aws/op_delete_file_bucket.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/functionbeat/manager/executor" ) type opDeleteFileBucket struct { @@ -33,7 +34,7 @@ func newOpDeleteFileBucket( } } -func (o *opDeleteFileBucket) Execute(_ executionContext) error { +func (o *opDeleteFileBucket) Execute(_ executor.Context) error { o.log.Debugf("Removing file '%s' on bucket '%s'", o.path, o.bucketName) input := &s3.DeleteObjectInput{ Bucket: aws.String(o.bucketName), diff --git a/x-pack/functionbeat/provider/aws/op_ensure_bucket.go b/x-pack/functionbeat/manager/aws/op_ensure_bucket.go similarity index 93% rename from x-pack/functionbeat/provider/aws/op_ensure_bucket.go rename to x-pack/functionbeat/manager/aws/op_ensure_bucket.go index e0f9ce5195d..304127ac151 100644 --- a/x-pack/functionbeat/provider/aws/op_ensure_bucket.go +++ b/x-pack/functionbeat/manager/aws/op_ensure_bucket.go @@ -13,6 +13,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/functionbeat/manager/executor" ) // This error is not provided by the S3 error package. @@ -28,7 +29,7 @@ func newOpEnsureBucket(log *logp.Logger, cfg aws.Config, bucketName string) *opE return &opEnsureBucket{log: log, svc: s3.New(cfg), bucketName: bucketName} } -func (o *opEnsureBucket) Execute(_ executionContext) error { +func (o *opEnsureBucket) Execute(_ executor.Context) error { o.log.Debugf("Verifying presence of S3 bucket: %s", o.bucketName) check := &s3.HeadBucketInput{Bucket: aws.String(o.bucketName)} diff --git a/x-pack/functionbeat/provider/aws/op_update_cloudformation.go b/x-pack/functionbeat/manager/aws/op_update_cloudformation.go similarity index 92% rename from x-pack/functionbeat/provider/aws/op_update_cloudformation.go rename to x-pack/functionbeat/manager/aws/op_update_cloudformation.go index b7c768a930c..315ed6d8900 100644 --- a/x-pack/functionbeat/provider/aws/op_update_cloudformation.go +++ b/x-pack/functionbeat/manager/aws/op_update_cloudformation.go @@ -13,6 +13,7 @@ import ( "github.com/gofrs/uuid" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/functionbeat/manager/executor" ) type opUpdateCloudFormation struct { @@ -22,7 +23,7 @@ type opUpdateCloudFormation struct { stackName string } -func (o *opUpdateCloudFormation) Execute(ctx executionContext) error { +func (o *opUpdateCloudFormation) Execute(ctx executor.Context) error { c, ok := ctx.(*stackContext) if !ok { return errWrongContext diff --git a/x-pack/functionbeat/provider/aws/op_upload_to_bucket.go b/x-pack/functionbeat/manager/aws/op_upload_to_bucket.go similarity index 89% rename from x-pack/functionbeat/provider/aws/op_upload_to_bucket.go rename to x-pack/functionbeat/manager/aws/op_upload_to_bucket.go index 4c7c37274d4..0064d53d559 100644 --- a/x-pack/functionbeat/provider/aws/op_upload_to_bucket.go +++ b/x-pack/functionbeat/manager/aws/op_upload_to_bucket.go @@ -12,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/functionbeat/manager/executor" ) type opUploadToBucket struct { @@ -39,7 +40,7 @@ func newOpUploadToBucket( } } -func (o *opUploadToBucket) Execute(_ executionContext) error { +func (o *opUploadToBucket) Execute(_ executor.Context) error { o.log.Debugf("Uploading file '%s' to bucket '%s' with size %d bytes", o.path, o.bucketName, len(o.raw)) input := &s3.PutObjectInput{ Bucket: aws.String(o.bucketName), @@ -57,7 +58,7 @@ func (o *opUploadToBucket) Execute(_ executionContext) error { return nil } -func (o *opUploadToBucket) Rollback(ctx executionContext) error { +func (o *opUploadToBucket) Rollback(ctx executor.Context) error { // The error will be logged but we do not enforce a hard failure because the file could have // been removed before. err := newOpDeleteFileBucket(o.log, o.config, o.bucketName, o.path).Execute(ctx) diff --git a/x-pack/functionbeat/provider/aws/op_wait_cloud_formation.go b/x-pack/functionbeat/manager/aws/op_wait_cloud_formation.go similarity index 96% rename from x-pack/functionbeat/provider/aws/op_wait_cloud_formation.go rename to x-pack/functionbeat/manager/aws/op_wait_cloud_formation.go index 040a9a69952..28178479694 100644 --- a/x-pack/functionbeat/provider/aws/op_wait_cloud_formation.go +++ b/x-pack/functionbeat/manager/aws/op_wait_cloud_formation.go @@ -14,6 +14,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/cloudformation/cloudformationiface" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/functionbeat/manager/executor" ) var periodicCheck = 2 * time.Second @@ -48,7 +49,7 @@ func newWaitDeleteCloudFormation( } } -func (o *opWaitCloudFormation) Execute(ctx executionContext) error { +func (o *opWaitCloudFormation) Execute(ctx executor.Context) error { c, ok := ctx.(*stackContext) if !ok { return errWrongContext diff --git a/x-pack/functionbeat/manager/aws/policies_test.go b/x-pack/functionbeat/manager/aws/policies_test.go new file mode 100644 index 00000000000..fba1111c80f --- /dev/null +++ b/x-pack/functionbeat/manager/aws/policies_test.go @@ -0,0 +1,70 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package aws + +import ( + "testing" + + "github.com/awslabs/goformation/cloudformation" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" + fnaws "github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws" +) + +func TestConfig(t *testing.T) { + t.Run("test permissions for event_source_arn", testPolicies) +} + +func testPolicies(t *testing.T) { + cfg := common.MustNewConfigFrom(map[string]interface{}{ + "name": "myfunction", + "description": "mydescription", + "triggers": []map[string]interface{}{ + map[string]interface{}{ + "event_source_arn": "abc456", + }, + map[string]interface{}{ + "event_source_arn": "abc1234", + }, + }, + }) + + k, err := fnaws.NewKinesis(&provider.DefaultProvider{}, cfg) + if !assert.NoError(t, err) { + return + } + + i, ok := k.(installer) + if !assert.True(t, ok) { + return + } + + policies := i.Policies() + if !assert.Equal(t, 1, len(policies)) { + return + } + + // ensure permissions on specified resources + expected := cloudformation.AWSIAMRole_Policy{ + PolicyName: cloudformation.Join("-", []string{"fnb", "kinesis", "myfunction"}), + PolicyDocument: map[string]interface{}{ + "Statement": []map[string]interface{}{ + map[string]interface{}{ + "Action": []string{ + "kinesis:GetRecords", + "kinesis:GetShardIterator", + "Kinesis:DescribeStream", + }, + "Effect": "Allow", + "Resource": []string{"abc1234", "abc456"}, + }, + }, + }, + } + + assert.Equal(t, expected, policies[0]) +} diff --git a/x-pack/functionbeat/provider/aws/stack_context.go b/x-pack/functionbeat/manager/aws/stack_context.go similarity index 100% rename from x-pack/functionbeat/provider/aws/stack_context.go rename to x-pack/functionbeat/manager/aws/stack_context.go diff --git a/x-pack/functionbeat/provider/aws/template_builder.go b/x-pack/functionbeat/manager/aws/template_builder.go similarity index 96% rename from x-pack/functionbeat/provider/aws/template_builder.go rename to x-pack/functionbeat/manager/aws/template_builder.go index de845af152e..28df63fb7cf 100644 --- a/x-pack/functionbeat/provider/aws/template_builder.go +++ b/x-pack/functionbeat/manager/aws/template_builder.go @@ -14,8 +14,9 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/core" - "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" + "github.com/elastic/beats/x-pack/functionbeat/manager/core" + fnaws "github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws" ) // zipData stores the data on the zip to be deployed @@ -46,7 +47,7 @@ const ( ) func NewTemplateBuilder(log *logp.Logger, cfg *common.Config, p provider.Provider) (provider.TemplateBuilder, error) { - config := &Config{} + config := &fnaws.Config{} if err := cfg.Unpack(config); err != nil { return nil, err } @@ -76,7 +77,7 @@ func (d *defaultTemplateBuilder) findFunction(name string) (installer, error) { // execute generates a template func (d *defaultTemplateBuilder) execute(name string) (templateData, error) { d.log.Debug("Compressing all assets into an artifact") - content, err := core.MakeZip() + content, err := core.MakeZip("aws") if err != nil { return templateData{}, err } @@ -122,7 +123,7 @@ func (d *defaultTemplateBuilder) template(function installer, name, codeLoc stri lambdaConfig := function.LambdaConfig() prefix := func(s string) string { - return normalizeResourceName("fnb" + name + s) + return fnaws.NormalizeResourceName("fnb" + name + s) } // AWS variables references:. diff --git a/x-pack/functionbeat/manager/beater/functionbeat.go b/x-pack/functionbeat/manager/beater/functionbeat.go new file mode 100644 index 00000000000..9c1c1cd6de7 --- /dev/null +++ b/x-pack/functionbeat/manager/beater/functionbeat.go @@ -0,0 +1,58 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package beater + +import ( + "fmt" + "time" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/x-pack/functionbeat/config" +) + +var ( + graceDelay = 45 * time.Minute + refreshDelay = 15 * time.Minute +) + +// Functionbeat is a beat designed to run under a serverless environment and listen to external triggers, +// each invocation will generate one or more events to Elasticsearch. +// +// Each serverless implementation is different but functionbeat follows a few execution rules. +// - Publishing events from the source to the output is done synchronously. +// - Execution can be suspended. +// - Run on a read only filesystem +// - More execution constraints based on speed and memory usage. +type Functionbeat struct { + log *logp.Logger + Config *config.Config +} + +// New creates an instance of functionbeat. +func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { + c := &config.DefaultConfig + if err := cfg.Unpack(c); err != nil { + return nil, fmt.Errorf("error reading config file: %+v", err) + } + + bt := &Functionbeat{ + log: logp.NewLogger("functionbeat"), + Config: c, + } + return bt, nil +} + +// Run starts functionbeat. +func (bt *Functionbeat) Run(b *beat.Beat) error { + bt.log.Info("Functionbeat is running") + defer bt.log.Info("Functionbeat stopped running") + + return nil +} + +// Stop stops Functionbeat. +func (bt *Functionbeat) Stop() {} diff --git a/x-pack/functionbeat/cmd/cli_handler.go b/x-pack/functionbeat/manager/cmd/cli_handler.go similarity index 53% rename from x-pack/functionbeat/cmd/cli_handler.go rename to x-pack/functionbeat/manager/cmd/cli_handler.go index c84d6ba3d13..5ec3771896b 100644 --- a/x-pack/functionbeat/cmd/cli_handler.go +++ b/x-pack/functionbeat/manager/cmd/cli_handler.go @@ -12,8 +12,8 @@ import ( "strings" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/core" - "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" + "github.com/elastic/beats/x-pack/functionbeat/manager/core" ) // Errors generated by the cliHandler. @@ -32,14 +32,22 @@ var ( // TODO(ph) functions could be merged into a single call , but I thought it was premature to do // it. type cliHandler struct { - cli provider.CLIManager + clis map[string]provider.CLIManager log *logp.Logger errOutput io.Writer output io.Writer + + functionsByProvider map[string]string } -func newCLIHandler(cli provider.CLIManager, errOutput io.Writer, output io.Writer) *cliHandler { - return &cliHandler{cli: cli, errOutput: errOutput, output: output, log: logp.NewLogger("cli-handler")} +func newCLIHandler(clis map[string]provider.CLIManager, functionsByProvider map[string]string, errOutput io.Writer, output io.Writer) *cliHandler { + return &cliHandler{ + clis: clis, + errOutput: errOutput, + functionsByProvider: functionsByProvider, + output: output, + log: logp.NewLogger("cli-handler"), + } } func (c *cliHandler) Deploy(names []string) error { @@ -50,15 +58,7 @@ func (c *cliHandler) Deploy(names []string) error { return errNoFunctionGiven } - errCount := 0 - for _, name := range names { - if err := c.cli.Deploy(name); err != nil { - fmt.Fprintf(c.errOutput, "Function: %s, could not deploy, error: %s\n", name, err) - errCount++ - continue - } - fmt.Fprintf(c.output, "Function: %s, deploy successful\n", name) - } + errCount := c.iterateCLIFunc(names, "deploy", provider.CLIManager.Deploy) if errCount > 0 { return fmt.Errorf("Fail to deploy %d function(s)", errCount) @@ -74,18 +74,10 @@ func (c *cliHandler) Update(names []string) error { return errNoFunctionGiven } - errCount := 0 - for _, name := range names { - if err := c.cli.Update(name); err != nil { - fmt.Fprintf(c.errOutput, "Function: %s, could not update, error: %s\n", name, err) - errCount++ - continue - } - fmt.Fprintf(c.output, "Function: %s, update successful\n", name) - } + errCount := c.iterateCLIFunc(names, "update", provider.CLIManager.Update) if errCount > 0 { - return fmt.Errorf("fail to deploy %d function(s)", errCount) + return fmt.Errorf("fail to update %d function(s)", errCount) } return nil } @@ -98,15 +90,7 @@ func (c *cliHandler) Remove(names []string) error { return errNoFunctionGiven } - errCount := 0 - for _, name := range names { - if err := c.cli.Remove(name); err != nil { - fmt.Fprintf(c.errOutput, "Function: %s, could not remove, error: %s\n", name, err) - errCount++ - continue - } - fmt.Fprintf(c.output, "Function: %s, remove successful\n", name) - } + errCount := c.iterateCLIFunc(names, "remove", provider.CLIManager.Remove) if errCount > 0 { return fmt.Errorf("fail to remove %d function(s)", errCount) @@ -115,17 +99,49 @@ func (c *cliHandler) Remove(names []string) error { } // TODO(ph) check current path and option flag for cobra -func (c *cliHandler) BuildPackage(output string) error { - content, err := core.MakeZip() - if err != nil { - return err - } +func (c *cliHandler) BuildPackage(outputPattern string) error { + for providerName := range c.clis { + content, err := core.MakeZip(providerName) + if err != nil { + return err + } - err = ioutil.WriteFile(output, content, 0644) - if err != nil { - return err - } + output := strings.ReplaceAll(outputPattern, "{{.Provider}}", providerName) + err = ioutil.WriteFile(output, content, 0644) + if err != nil { + return err + } - fmt.Fprintf(c.output, "Generated package at: %s\n", output) + fmt.Fprintf(c.output, "Generated package for provider %s at: %s\n", providerName, output) + } return nil } + +func (c *cliHandler) iterateCLIFunc(names []string, operation string, f func(provider.CLIManager, string) error) int { + errCount := 0 + for _, name := range names { + providerName, ok := c.functionsByProvider[name] + if !ok { + fmt.Fprintf(c.errOutput, "Function: %s, could not be %s. Enable it.", name, operation) + errCount++ + continue + } + + cli, ok := c.clis[providerName] + if !ok { + fmt.Fprintf(c.errOutput, "Function: %s, could not be %s. Selected provider '%s' cannot be found", name, operation, providerName) + errCount++ + continue + } + + err := f(cli, name) + if err != nil { + fmt.Fprintf(c.errOutput, "Function: %s, could not %s, error: %s\n", name, operation, err) + errCount++ + continue + } + + fmt.Fprintf(c.output, "Function: %s, %s successful\n", name, operation) + } + return errCount +} diff --git a/x-pack/functionbeat/cmd/cli_handler_test.go b/x-pack/functionbeat/manager/cmd/cli_handler_test.go similarity index 74% rename from x-pack/functionbeat/cmd/cli_handler_test.go rename to x-pack/functionbeat/manager/cmd/cli_handler_test.go index c0918ce41ce..745e90a6726 100644 --- a/x-pack/functionbeat/cmd/cli_handler_test.go +++ b/x-pack/functionbeat/manager/cmd/cli_handler_test.go @@ -12,6 +12,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + + "github.com/elastic/beats/x-pack/functionbeat/function/provider" ) type mockCLIManager struct { @@ -39,6 +41,19 @@ func outputs() (io.Writer, io.Writer) { return errOut, output } +func functionByProvider() map[string]string { + return map[string]string{ + "super": "mockProvider", + "saiyajin": "mockProvider", + } +} + +func wrapCLIManager(m provider.CLIManager) map[string]provider.CLIManager { + return map[string]provider.CLIManager{ + "mockProvider": m, + } +} + func TestCliHandler(t *testing.T) { t.Run("deploy", testDeploy) t.Run("update", testUpdate) @@ -48,7 +63,7 @@ func TestCliHandler(t *testing.T) { func testDeploy(t *testing.T) { t.Run("return error when no functions are specified", func(t *testing.T) { errOut, output := outputs() - handler := newCLIHandler(&mockCLIManager{}, errOut, output) + handler := newCLIHandler(wrapCLIManager(&mockCLIManager{}), functionByProvider(), errOut, output) err := handler.Deploy([]string{}) assert.Equal(t, errNoFunctionGiven, err) }) @@ -58,7 +73,7 @@ func testDeploy(t *testing.T) { myErr := errors.New("my error") m := &mockCLIManager{} m.On("Deploy", "saiyajin").Return(myErr) - handler := newCLIHandler(m, errOut, output) + handler := newCLIHandler(wrapCLIManager(m), functionByProvider(), errOut, output) err := handler.Deploy([]string{"saiyajin"}) assert.Error(t, err) }) @@ -68,7 +83,7 @@ func testDeploy(t *testing.T) { m := &mockCLIManager{} m.On("Deploy", "super").Return(nil) m.On("Deploy", "saiyajin").Return(nil) - handler := newCLIHandler(m, errOut, output) + handler := newCLIHandler(wrapCLIManager(m), functionByProvider(), errOut, output) err := handler.Deploy([]string{"super", "saiyajin"}) assert.NoError(t, err) m.AssertExpectations(t) @@ -78,7 +93,7 @@ func testDeploy(t *testing.T) { func testUpdate(t *testing.T) { t.Run("return error when no functions are specified", func(t *testing.T) { errOut, output := outputs() - handler := newCLIHandler(&mockCLIManager{}, errOut, output) + handler := newCLIHandler(wrapCLIManager(&mockCLIManager{}), functionByProvider(), errOut, output) err := handler.Update([]string{}) assert.Equal(t, errNoFunctionGiven, err) }) @@ -88,7 +103,7 @@ func testUpdate(t *testing.T) { myErr := errors.New("my error") m := &mockCLIManager{} m.On("Update", "saiyajin").Return(myErr) - handler := newCLIHandler(m, errOut, output) + handler := newCLIHandler(wrapCLIManager(m), functionByProvider(), errOut, output) err := handler.Update([]string{"saiyajin"}) assert.Error(t, err) }) @@ -98,7 +113,7 @@ func testUpdate(t *testing.T) { m := &mockCLIManager{} m.On("Update", "super").Return(nil) m.On("Update", "saiyajin").Return(nil) - handler := newCLIHandler(m, errOut, output) + handler := newCLIHandler(wrapCLIManager(m), functionByProvider(), errOut, output) err := handler.Update([]string{"super", "saiyajin"}) assert.NoError(t, err) m.AssertExpectations(t) @@ -108,7 +123,7 @@ func testUpdate(t *testing.T) { func testRemove(t *testing.T) { t.Run("return error when no functions are specified", func(t *testing.T) { errOut, output := outputs() - handler := newCLIHandler(&mockCLIManager{}, errOut, output) + handler := newCLIHandler(wrapCLIManager(&mockCLIManager{}), functionByProvider(), errOut, output) err := handler.Remove([]string{}) assert.Equal(t, errNoFunctionGiven, err) }) @@ -118,7 +133,7 @@ func testRemove(t *testing.T) { myErr := errors.New("my error") m := &mockCLIManager{} m.On("Remove", "saiyajin").Return(myErr) - handler := newCLIHandler(m, errOut, output) + handler := newCLIHandler(wrapCLIManager(m), functionByProvider(), errOut, output) err := handler.Remove([]string{"saiyajin"}) assert.Error(t, err) }) @@ -128,7 +143,7 @@ func testRemove(t *testing.T) { m := &mockCLIManager{} m.On("Remove", "super").Return(nil) m.On("Remove", "saiyajin").Return(nil) - handler := newCLIHandler(m, errOut, output) + handler := newCLIHandler(wrapCLIManager(m), functionByProvider(), errOut, output) err := handler.Remove([]string{"super", "saiyajin"}) assert.NoError(t, err) m.AssertExpectations(t) diff --git a/x-pack/functionbeat/cmd/provider_cmd.go b/x-pack/functionbeat/manager/cmd/provider_cmd.go similarity index 56% rename from x-pack/functionbeat/cmd/provider_cmd.go rename to x-pack/functionbeat/manager/cmd/provider_cmd.go index f740ca3c146..acbaad2e668 100644 --- a/x-pack/functionbeat/cmd/provider_cmd.go +++ b/x-pack/functionbeat/manager/cmd/provider_cmd.go @@ -14,12 +14,12 @@ import ( "github.com/elastic/beats/libbeat/cmd/instance" "github.com/elastic/beats/libbeat/common/cli" "github.com/elastic/beats/x-pack/functionbeat/config" - "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" ) var output string -func initProvider() (provider.Provider, error) { +func initProviders() ([]provider.Provider, error) { b, err := instance.NewInitializedBeat(instance.Settings{Name: Name}) if err != nil { return nil, err @@ -35,22 +35,55 @@ func initProvider() (provider.Provider, error) { return nil, err } - return provider.NewProvider(cfg) + var providers []provider.Provider + for _, p := range cfg.Provider.GetFields() { + isAvailable, err := provider.IsAvailable(p) + if err != nil { + return nil, err + } + if !isAvailable { + continue + } + + providerCfg, err := cfg.Provider.Child(p, -1) + if err != nil { + return nil, err + } + provider, err := provider.NewProvider(p, providerCfg) + if err != nil { + return nil, err + } + providers = append(providers, provider) + } + + return providers, nil } -// TODO: Add List() subcommand. func handler() (*cliHandler, error) { - provider, err := initProvider() + providers, err := initProviders() if err != nil { return nil, err } - cli, err := provider.CLIManager() - if err != nil { - return nil, err + clis := make(map[string]provider.CLIManager) + functionsByProvider := make(map[string]string) + for _, provider := range providers { + cli, err := provider.CLIManager() + if err != nil { + return nil, err + } + clis[provider.Name()] = cli + + enabledFunctions, err := provider.EnabledFunctions() + if err != nil { + return nil, err + } + + for _, f := range enabledFunctions { + functionsByProvider[f] = provider.Name() + } } - handler := newCLIHandler(cli, os.Stdout, os.Stderr) - return handler, nil + return newCLIHandler(clis, functionsByProvider, os.Stdout, os.Stderr), nil } func genCLICmd(use, short string, fn func(*cliHandler, []string) error) *cobra.Command { @@ -62,6 +95,7 @@ func genCLICmd(use, short string, fn func(*cliHandler, []string) error) *cobra.C if err != nil { return err } + return fn(h, args) }), } @@ -89,19 +123,17 @@ func genPackageCmd() *cobra.Command { return err } - if len(output) == 0 { - dir, err := os.Getwd() - if err != nil { - return err - } - - output = filepath.Join(dir, "package.zip") - } - return h.BuildPackage(output) }), } - cmd.Flags().StringVarP(&output, "output", "o", "", "full path to the package") + + dir, err := os.Getwd() + if err != nil { + panic(err) + } + + defaultOutput := filepath.Join(dir, "package-{{.Provider}}.zip") + cmd.Flags().StringVarP(&output, "output", "o", defaultOutput, "full path pattern to the package") return cmd } @@ -110,20 +142,23 @@ func genExportFunctionCmd() *cobra.Command { Use: "function", Short: "Export function template", Run: cli.RunWith(func(_ *cobra.Command, args []string) error { - p, err := initProvider() - if err != nil { - return err - } - builder, err := p.TemplateBuilder() + providers, err := initProviders() if err != nil { return err } - for _, name := range args { - template, err := builder.RawTemplate(name) + + for _, p := range providers { + builder, err := p.TemplateBuilder() if err != nil { - return fmt.Errorf("error generating raw template for %s: %+v", name, err) + return err + } + for _, name := range args { + template, err := builder.RawTemplate(name) + if err != nil { + return fmt.Errorf("error generating raw template for %s: %+v", name, err) + } + fmt.Println(template) } - fmt.Println(template) } return nil }), diff --git a/x-pack/functionbeat/cmd/root.go b/x-pack/functionbeat/manager/cmd/root.go similarity index 83% rename from x-pack/functionbeat/cmd/root.go rename to x-pack/functionbeat/manager/cmd/root.go index c10b9315ffa..a88286d16c0 100644 --- a/x-pack/functionbeat/cmd/root.go +++ b/x-pack/functionbeat/manager/cmd/root.go @@ -7,8 +7,7 @@ package cmd import ( cmd "github.com/elastic/beats/libbeat/cmd" "github.com/elastic/beats/libbeat/cmd/instance" - "github.com/elastic/beats/x-pack/functionbeat/beater" - "github.com/elastic/beats/x-pack/functionbeat/config" + "github.com/elastic/beats/x-pack/functionbeat/manager/beater" ) // Name of this beat @@ -19,8 +18,7 @@ var RootCmd *cmd.BeatsRootCmd func init() { RootCmd = cmd.GenRootCmdWithSettings(beater.New, instance.Settings{ - Name: Name, - ConfigOverrides: config.ConfigOverrides, + Name: Name, }) RootCmd.AddCommand(genDeployCmd()) diff --git a/x-pack/functionbeat/core/bundle/bundle.go b/x-pack/functionbeat/manager/core/bundle/bundle.go similarity index 100% rename from x-pack/functionbeat/core/bundle/bundle.go rename to x-pack/functionbeat/manager/core/bundle/bundle.go diff --git a/x-pack/functionbeat/core/bundle/bundle_test.go b/x-pack/functionbeat/manager/core/bundle/bundle_test.go similarity index 100% rename from x-pack/functionbeat/core/bundle/bundle_test.go rename to x-pack/functionbeat/manager/core/bundle/bundle_test.go diff --git a/x-pack/functionbeat/core/bundle/testdata/lipsum.txt b/x-pack/functionbeat/manager/core/bundle/testdata/lipsum.txt similarity index 100% rename from x-pack/functionbeat/core/bundle/testdata/lipsum.txt rename to x-pack/functionbeat/manager/core/bundle/testdata/lipsum.txt diff --git a/x-pack/functionbeat/core/makezip.go b/x-pack/functionbeat/manager/core/makezip.go similarity index 92% rename from x-pack/functionbeat/core/makezip.go rename to x-pack/functionbeat/manager/core/makezip.go index 5ac5127c646..86eda7764b7 100644 --- a/x-pack/functionbeat/core/makezip.go +++ b/x-pack/functionbeat/manager/core/makezip.go @@ -16,7 +16,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/keystore" "github.com/elastic/beats/x-pack/functionbeat/config" - "github.com/elastic/beats/x-pack/functionbeat/core/bundle" + "github.com/elastic/beats/x-pack/functionbeat/manager/core/bundle" ) // Package size limits for function providers, we should be a lot under this limit but @@ -45,7 +45,7 @@ func rawYaml() ([]byte, error) { } // MakeZip creates a zip from the the current artifacts and the currently available configuration. -func MakeZip() ([]byte, error) { +func MakeZip(provider string) ([]byte, error) { rawConfig, err := rawYaml() if err != nil { return nil, err @@ -53,7 +53,7 @@ func MakeZip() ([]byte, error) { resources := []bundle.Resource{ &bundle.MemoryFile{Path: "functionbeat.yml", Raw: rawConfig, FileMode: 0766}, - &bundle.LocalFile{Path: "pkg/functionbeat", FileMode: 0755}, + &bundle.LocalFile{Path: "pkg/functionbeat-" + provider, FileMode: 0755}, } rawKeystore, err := keystoreRaw() diff --git a/x-pack/functionbeat/provider/aws/executor.go b/x-pack/functionbeat/manager/executor/executor.go similarity index 53% rename from x-pack/functionbeat/provider/aws/executor.go rename to x-pack/functionbeat/manager/executor/executor.go index ec5259efa6f..8cbc0344b22 100644 --- a/x-pack/functionbeat/provider/aws/executor.go +++ b/x-pack/functionbeat/manager/executor/executor.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package aws +package executor import ( "errors" @@ -11,14 +11,19 @@ import ( ) var ( - errNeverRun = errors.New("executor was never executed") - errCannotAdd = errors.New("cannot add to an already executed executor") - errAlreadyExecuted = errors.New("executor already executed") + // ErrNeverRun is returned if the step was never run. + ErrNeverRun = errors.New("executor was never executed") + // ErrCannotAdd is returned if the executor had already ran and a new operation is added. + ErrCannotAdd = errors.New("cannot add to an already executed executor") + // ErrAlreadyExecuted is returned if it has already run. + ErrAlreadyExecuted = errors.New("executor already executed") ) -type executionContext interface{} +// Context holds the information of each execution step. +type Context interface{} -type executor struct { +// Executor tries to execute operations. If an operation fails, everything is rolled back. +type Executor struct { operations []doer undos []undoer completed bool @@ -26,26 +31,28 @@ type executor struct { } type doer interface { - Execute(executionContext) error + Execute(Context) error } type undoer interface { - Rollback(executionContext) error + Rollback(Context) error } -func newExecutor(log *logp.Logger) *executor { +// NewExecutor return a new executor. +func NewExecutor(log *logp.Logger) *Executor { if log == nil { log = logp.NewLogger("") } log = log.Named("executor") - return &executor{log: log} + return &Executor{log: log} } -func (e *executor) Execute(ctx executionContext) (err error) { +// Execute executes all operations. If something fail it rolls back. +func (e *Executor) Execute(ctx Context) (err error) { e.log.Debugf("The executor is executing '%d' operations for converging state", len(e.operations)) if e.IsCompleted() { - return errAlreadyExecuted + return ErrAlreadyExecuted } for _, operation := range e.operations { err = operation.Execute(ctx) @@ -64,10 +71,11 @@ func (e *executor) Execute(ctx executionContext) (err error) { return err } -func (e *executor) Rollback(ctx executionContext) (err error) { +// Rollback rolls back executed operations. +func (e *Executor) Rollback(ctx Context) (err error) { e.log.Debugf("The executor is rolling back previous execution, '%d' operations to rollback", len(e.undos)) if !e.IsCompleted() { - return errNeverRun + return ErrNeverRun } for i := len(e.undos) - 1; i >= 0; i-- { operation := e.undos[i] @@ -85,18 +93,20 @@ func (e *executor) Rollback(ctx executionContext) (err error) { return err } -func (e *executor) Add(operation ...doer) error { +// Add adds new operation to execute. +func (e *Executor) Add(operation ...doer) error { if e.IsCompleted() { - return errCannotAdd + return ErrCannotAdd } e.operations = append(e.operations, operation...) return nil } -func (e *executor) markCompleted() { +func (e *Executor) markCompleted() { e.completed = true } -func (e *executor) IsCompleted() bool { +// IsCompleted returns if all operations are completed. +func (e *Executor) IsCompleted() bool { return e.completed } diff --git a/x-pack/functionbeat/provider/aws/executor_test.go b/x-pack/functionbeat/manager/executor/executor_test.go similarity index 88% rename from x-pack/functionbeat/provider/aws/executor_test.go rename to x-pack/functionbeat/manager/executor/executor_test.go index 06d32ae30e1..024ef79af1d 100644 --- a/x-pack/functionbeat/provider/aws/executor_test.go +++ b/x-pack/functionbeat/manager/executor/executor_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package aws +package executor import ( "errors" @@ -16,12 +16,12 @@ type MockUndoer struct { mock.Mock } -func (m *MockUndoer) Execute(_ executionContext) error { +func (m *MockUndoer) Execute(_ Context) error { args := m.Called() return args.Error(0) } -func (m *MockUndoer) Rollback(_ executionContext) error { +func (m *MockUndoer) Rollback(_ Context) error { args := m.Called() return args.Error(0) } @@ -30,7 +30,7 @@ type MockDoer struct { mock.Mock } -func (m *MockDoer) Execute(_ executionContext) error { +func (m *MockDoer) Execute(_ Context) error { args := m.Called() return args.Error(0) } @@ -46,7 +46,7 @@ func TestExecutor(t *testing.T) { func testAll(t *testing.T) { ctx := struct{}{} - executor := newExecutor(nil) + executor := NewExecutor(nil) m1 := &MockDoer{} m1.On("Execute").Return(nil) @@ -65,7 +65,7 @@ func testAll(t *testing.T) { func testError(t *testing.T) { ctx := struct{}{} - executor := newExecutor(nil) + executor := NewExecutor(nil) m1 := &MockDoer{} m1.On("Execute").Return(nil) @@ -87,7 +87,7 @@ func testError(t *testing.T) { func testUndoer(t *testing.T) { ctx := struct{}{} - executor := newExecutor(nil) + executor := NewExecutor(nil) m1 := &MockUndoer{} m1.On("Execute").Return(nil) m1.On("Rollback").Return(nil) @@ -118,7 +118,7 @@ func testFailRollback(t *testing.T) { e := errors.New("error on execution") e2 := errors.New("error on rollback") - executor := newExecutor(nil) + executor := NewExecutor(nil) m1 := &MockUndoer{} m1.On("Execute").Return(nil) @@ -148,7 +148,7 @@ func testFailRollback(t *testing.T) { func testCannotRunTwice(t *testing.T) { ctx := struct{}{} - executor := newExecutor(nil) + executor := NewExecutor(nil) m1 := &MockDoer{} m1.On("Execute").Return(nil) @@ -161,11 +161,11 @@ func testCannotRunTwice(t *testing.T) { m1.AssertExpectations(t) assert.True(t, executor.IsCompleted()) - assert.Error(t, errAlreadyExecuted, executor.Execute(ctx)) + assert.Error(t, ErrAlreadyExecuted, executor.Execute(ctx)) } func testCannotAddCompleted(t *testing.T) { - executor := newExecutor(nil) + executor := NewExecutor(nil) m1 := &MockDoer{} m1.On("Execute").Return(nil) diff --git a/x-pack/functionbeat/provider/aws/api_gateway_proxy.go b/x-pack/functionbeat/provider/aws/aws/api_gateway_proxy.go similarity index 84% rename from x-pack/functionbeat/provider/aws/api_gateway_proxy.go rename to x-pack/functionbeat/provider/aws/aws/api_gateway_proxy.go index 4425c40c392..6b651669a16 100644 --- a/x-pack/functionbeat/provider/aws/api_gateway_proxy.go +++ b/x-pack/functionbeat/provider/aws/aws/api_gateway_proxy.go @@ -14,10 +14,11 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/feature" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/core" - "github.com/elastic/beats/x-pack/functionbeat/provider" - "github.com/elastic/beats/x-pack/functionbeat/provider/aws/transformer" + "github.com/elastic/beats/x-pack/functionbeat/function/core" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" + "github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws/transformer" ) type message struct { @@ -37,6 +38,11 @@ func NewAPIGatewayProxy(provider provider.Provider, config *common.Config) (prov return &APIGatewayProxy{log: logp.NewLogger("api gateway proxy")}, nil } +// APIGatewayProxyDetails returns the details of the feature. +func APIGatewayProxyDetails() *feature.Details { + return feature.NewDetails("API Gateway proxy trigger", "receive events from the api gateway proxy", feature.Experimental) +} + // Run starts the lambda function and wait for web triggers. func (a *APIGatewayProxy) Run(_ context.Context, client core.Client) error { lambda.Start(a.createHandler(client)) diff --git a/x-pack/functionbeat/provider/aws/api_gateway_proxy_test.go b/x-pack/functionbeat/provider/aws/aws/api_gateway_proxy_test.go similarity index 97% rename from x-pack/functionbeat/provider/aws/api_gateway_proxy_test.go rename to x-pack/functionbeat/provider/aws/aws/api_gateway_proxy_test.go index 81683fbb327..74d527f65a3 100644 --- a/x-pack/functionbeat/provider/aws/api_gateway_proxy_test.go +++ b/x-pack/functionbeat/provider/aws/aws/api_gateway_proxy_test.go @@ -14,7 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" ) func TestAPIGatewayProxy(t *testing.T) { diff --git a/x-pack/functionbeat/provider/aws/cloudwatch_logs.go b/x-pack/functionbeat/provider/aws/aws/cloudwatch_logs.go similarity index 90% rename from x-pack/functionbeat/provider/aws/cloudwatch_logs.go rename to x-pack/functionbeat/provider/aws/aws/cloudwatch_logs.go index ffd95b1dca7..aabf2b30289 100644 --- a/x-pack/functionbeat/provider/aws/cloudwatch_logs.go +++ b/x-pack/functionbeat/provider/aws/aws/cloudwatch_logs.go @@ -17,10 +17,11 @@ import ( "github.com/awslabs/goformation/cloudformation" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/feature" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/core" - "github.com/elastic/beats/x-pack/functionbeat/provider" - "github.com/elastic/beats/x-pack/functionbeat/provider/aws/transformer" + "github.com/elastic/beats/x-pack/functionbeat/function/core" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" + "github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws/transformer" ) var ( @@ -33,7 +34,7 @@ type CloudwatchLogsConfig struct { Triggers []*CloudwatchLogsTriggerConfig `config:"triggers"` Description string `config:"description"` Name string `config:"name" validate:"nonzero,required"` - LambdaConfig *lambdaConfig `config:",inline"` + LambdaConfig *LambdaConfig `config:",inline"` } // CloudwatchLogsTriggerConfig is the configuration for the specific triggers for cloudwatch. @@ -95,6 +96,11 @@ func NewCloudwatchLogs(provider provider.Provider, cfg *common.Config) (provider return &CloudwatchLogs{log: logp.NewLogger("cloudwatch_logs"), config: config}, nil } +// CloudwatchLogsDetails returns the details of the feature. +func CloudwatchLogsDetails() *feature.Details { + return feature.NewDetails("Cloudwatch Logs trigger", "receive events from cloudwatch logs.", feature.Stable) +} + // Run start the AWS lambda handles and will transform any events received to the pipeline. func (c *CloudwatchLogs) Run(_ context.Context, client core.Client) error { lambda.Start(c.createHandler(client)) @@ -165,7 +171,7 @@ func (r *AWSLogsSubscriptionFilter) AWSCloudFormationType() string { // Template returns the cloudformation template for configuring the service with the specified triggers. func (c *CloudwatchLogs) Template() *cloudformation.Template { prefix := func(suffix string) string { - return normalizeResourceName("fnb" + c.config.Name + suffix) + return NormalizeResourceName("fnb" + c.config.Name + suffix) } template := cloudformation.NewTemplate() @@ -197,7 +203,7 @@ func (c *CloudwatchLogs) Template() *cloudformation.Template { } // doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-logs-subscriptionfilter.html - template.Resources[prefix("SF")+normalizeResourceName(string(trigger.LogGroupName))] = &AWSLogsSubscriptionFilter{ + template.Resources[prefix("SF")+NormalizeResourceName(string(trigger.LogGroupName))] = &AWSLogsSubscriptionFilter{ DestinationArn: cloudformation.GetAtt(prefix(""), "Arn"), FilterPattern: trigger.FilterPattern, LogGroupName: string(trigger.LogGroupName), @@ -207,7 +213,7 @@ func (c *CloudwatchLogs) Template() *cloudformation.Template { } // LambdaConfig returns the configuration to use when creating the lambda. -func (c *CloudwatchLogs) LambdaConfig() *lambdaConfig { +func (c *CloudwatchLogs) LambdaConfig() *LambdaConfig { return c.config.LambdaConfig } diff --git a/x-pack/functionbeat/provider/aws/cloudwatch_logs_test.go b/x-pack/functionbeat/provider/aws/aws/cloudwatch_logs_test.go similarity index 98% rename from x-pack/functionbeat/provider/aws/cloudwatch_logs_test.go rename to x-pack/functionbeat/provider/aws/aws/cloudwatch_logs_test.go index 0d6d41a3da0..e746f18f58b 100644 --- a/x-pack/functionbeat/provider/aws/cloudwatch_logs_test.go +++ b/x-pack/functionbeat/provider/aws/aws/cloudwatch_logs_test.go @@ -18,7 +18,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" ) type arrayBackedClient struct { diff --git a/x-pack/functionbeat/provider/aws/config.go b/x-pack/functionbeat/provider/aws/aws/config.go similarity index 89% rename from x-pack/functionbeat/provider/aws/config.go rename to x-pack/functionbeat/provider/aws/aws/config.go index 373f4772bac..0571e3ce932 100644 --- a/x-pack/functionbeat/provider/aws/config.go +++ b/x-pack/functionbeat/provider/aws/aws/config.go @@ -26,7 +26,7 @@ const maxMegabytes = 3008 // DefaultLambdaConfig confguration for AWS lambda function. var ( - DefaultLambdaConfig = &lambdaConfig{ + DefaultLambdaConfig = &LambdaConfig{ MemorySize: 128 * 1024 * 1024, Timeout: time.Second * 3, Concurrency: 5, @@ -35,9 +35,13 @@ var ( // Source: https://docs.aws.amazon.com/lambda/latest/dg/API_CreateFunction.html#SSS-CreateFunction-request-Role arnRolePattern = "arn:(aws[a-zA-Z-]*)?:iam::\\d{12}:role/?[a-zA-Z_0-9+=,.@\\-_/]+" roleRE = regexp.MustCompile(arnRolePattern) + + // Chars for resource name anything else will be replaced. + validChars = regexp.MustCompile("[^a-zA-Z0-9]") ) -type lambdaConfig struct { +// LambdaConfig stores the common configuration of Lambda functions. +type LambdaConfig struct { Concurrency int `config:"concurrency" validate:"min=0,max=1000"` DeadLetterConfig *deadLetterConfig `config:"dead_letter_config"` Description string `config:"description"` @@ -47,7 +51,8 @@ type lambdaConfig struct { VPCConfig *vpcConfig `config:"virtual_private_cloud"` } -func (c *lambdaConfig) Validate() error { +// Validate checks a LambdaConfig +func (c *LambdaConfig) Validate() error { if c.MemorySize.Megabytes() == 0 { return fmt.Errorf("'memory_size' need to be higher than 0 and must be a factor 64") } @@ -127,3 +132,8 @@ func (b *bucket) Unpack(s string) error { *b = bucket(s) return nil } + +// NormalizeResourceName extracts invalid chars. +func NormalizeResourceName(s string) string { + return validChars.ReplaceAllString(s, "") +} diff --git a/x-pack/functionbeat/provider/aws/config_test.go b/x-pack/functionbeat/provider/aws/aws/config_test.go similarity index 68% rename from x-pack/functionbeat/provider/aws/config_test.go rename to x-pack/functionbeat/provider/aws/aws/config_test.go index 715c0288e37..804f7eefa8c 100644 --- a/x-pack/functionbeat/provider/aws/config_test.go +++ b/x-pack/functionbeat/provider/aws/aws/config_test.go @@ -68,3 +68,39 @@ func TestBucket(t *testing.T) { assert.Error(t, err) }) } + +func TestNormalize(t *testing.T) { + tests := []struct { + title string + candidate string + chars string + expected string + }{ + { + title: "when the string contains invalid chars", + candidate: "/var/log-alpha/tmp:ok", + expected: "varlogalphatmpok", + }, + { + title: "when we have an empty string", + candidate: "", + expected: "", + }, + { + title: "when we don't have any invalid chars", + candidate: "hello", + expected: "hello", + }, + { + title: "when the string contains underscore", + candidate: "/var/log-alpha/tmp:ok_moreok", + expected: "varlogalphatmpokmoreok", + }, + } + + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + assert.Equal(t, test.expected, NormalizeResourceName(test.candidate)) + }) + } +} diff --git a/x-pack/functionbeat/provider/aws/kinesis.go b/x-pack/functionbeat/provider/aws/aws/kinesis.go similarity index 90% rename from x-pack/functionbeat/provider/aws/kinesis.go rename to x-pack/functionbeat/provider/aws/aws/kinesis.go index 56df5ab83df..e38671bba3a 100644 --- a/x-pack/functionbeat/provider/aws/kinesis.go +++ b/x-pack/functionbeat/provider/aws/aws/kinesis.go @@ -16,10 +16,11 @@ import ( "github.com/awslabs/goformation/cloudformation" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/feature" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/core" - "github.com/elastic/beats/x-pack/functionbeat/provider" - "github.com/elastic/beats/x-pack/functionbeat/provider/aws/transformer" + "github.com/elastic/beats/x-pack/functionbeat/function/core" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" + "github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws/transformer" ) type startingPosition uint @@ -76,7 +77,7 @@ type KinesisConfig struct { Description string `config:"description"` Name string `config:"name" validate:"nonzero,required"` Triggers []*KinesisTriggerConfig `config:"triggers"` - LambdaConfig *lambdaConfig `config:",inline"` + LambdaConfig *LambdaConfig `config:",inline"` } // Validate validates the configuration. @@ -123,6 +124,11 @@ func NewKinesis(provider provider.Provider, cfg *common.Config) (provider.Functi return &Kinesis{log: logp.NewLogger("kinesis"), config: config}, nil } +// KinesisDetails returns the details of the feature. +func KinesisDetails() *feature.Details { + return feature.NewDetails("Kinesis trigger", "receive events from a Kinesis stream", feature.Stable) +} + // Run starts the lambda function and wait for web triggers. func (k *Kinesis) Run(_ context.Context, client core.Client) error { lambda.Start(k.createHandler(client)) @@ -149,7 +155,7 @@ func (k *Kinesis) Name() string { } // LambdaConfig returns the configuration to use when creating the lambda. -func (k *Kinesis) LambdaConfig() *lambdaConfig { +func (k *Kinesis) LambdaConfig() *LambdaConfig { return k.config.LambdaConfig } @@ -158,7 +164,7 @@ func (k *Kinesis) LambdaConfig() *lambdaConfig { func (k *Kinesis) Template() *cloudformation.Template { template := cloudformation.NewTemplate() prefix := func(suffix string) string { - return normalizeResourceName("fnb" + k.config.Name + suffix) + return NormalizeResourceName("fnb" + k.config.Name + suffix) } for _, trigger := range k.config.Triggers { diff --git a/x-pack/functionbeat/provider/aws/kinesis_test.go b/x-pack/functionbeat/provider/aws/aws/kinesis_test.go similarity index 79% rename from x-pack/functionbeat/provider/aws/kinesis_test.go rename to x-pack/functionbeat/provider/aws/aws/kinesis_test.go index 6304fed67cc..6178758fede 100644 --- a/x-pack/functionbeat/provider/aws/kinesis_test.go +++ b/x-pack/functionbeat/provider/aws/aws/kinesis_test.go @@ -10,11 +10,10 @@ import ( "testing" "github.com/aws/aws-lambda-go/events" - "github.com/awslabs/goformation/cloudformation" "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" ) func TestKinesis(t *testing.T) { @@ -57,7 +56,6 @@ func TestKinesis(t *testing.T) { t.Run("test config validation", testKinesisConfig) t.Run("test starting position", testStartingPosition) - t.Run("test permissions for event_source_arn", testPolicies) } func generateKinesisEvent() events.KinesisEvent { @@ -195,53 +193,3 @@ func testStartingPosition(t *testing.T) { assert.Error(t, err) }) } - -func testPolicies(t *testing.T) { - cfg := common.MustNewConfigFrom(map[string]interface{}{ - "name": "myfunction", - "description": "mydescription", - "triggers": []map[string]interface{}{ - map[string]interface{}{ - "event_source_arn": "abc456", - }, - map[string]interface{}{ - "event_source_arn": "abc1234", - }, - }, - }) - - k, err := NewKinesis(&provider.DefaultProvider{}, cfg) - if !assert.NoError(t, err) { - return - } - - i, ok := k.(installer) - if !assert.True(t, ok) { - return - } - - policies := i.Policies() - if !assert.Equal(t, 1, len(policies)) { - return - } - - // ensure permissions on specified resources - expected := cloudformation.AWSIAMRole_Policy{ - PolicyName: cloudformation.Join("-", []string{"fnb", "kinesis", "myfunction"}), - PolicyDocument: map[string]interface{}{ - "Statement": []map[string]interface{}{ - map[string]interface{}{ - "Action": []string{ - "kinesis:GetRecords", - "kinesis:GetShardIterator", - "Kinesis:DescribeStream", - }, - "Effect": "Allow", - "Resource": []string{"abc1234", "abc456"}, - }, - }, - }, - } - - assert.Equal(t, expected, policies[0]) -} diff --git a/x-pack/functionbeat/provider/aws/sqs.go b/x-pack/functionbeat/provider/aws/aws/sqs.go similarity index 86% rename from x-pack/functionbeat/provider/aws/sqs.go rename to x-pack/functionbeat/provider/aws/aws/sqs.go index ced6feb23c4..8c78f91d566 100644 --- a/x-pack/functionbeat/provider/aws/sqs.go +++ b/x-pack/functionbeat/provider/aws/aws/sqs.go @@ -14,10 +14,11 @@ import ( "github.com/awslabs/goformation/cloudformation" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/feature" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/x-pack/functionbeat/core" - "github.com/elastic/beats/x-pack/functionbeat/provider" - "github.com/elastic/beats/x-pack/functionbeat/provider/aws/transformer" + "github.com/elastic/beats/x-pack/functionbeat/function/core" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" + "github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws/transformer" ) const batchSize = 10 @@ -27,7 +28,7 @@ type SQSConfig struct { Triggers []*SQSTriggerConfig `config:"triggers"` Description string `config:"description"` Name string `config:"name" validate:"nonzero,required"` - LambdaConfig *lambdaConfig `config:",inline"` + LambdaConfig *LambdaConfig `config:",inline"` } // SQSTriggerConfig configuration for the current trigger. @@ -58,6 +59,11 @@ func NewSQS(provider provider.Provider, cfg *common.Config) (provider.Function, return &SQS{log: logp.NewLogger("sqs"), config: config}, nil } +// SQSDetails returns the details of the feature. +func SQSDetails() *feature.Details { + return feature.NewDetails("SQS trigger", "receive events from a SQS queue", feature.Stable) +} + // Run starts the lambda function and wait for web triggers. func (s *SQS) Run(_ context.Context, client core.Client) error { lambda.Start(s.createHandler(client)) @@ -88,11 +94,11 @@ func (s *SQS) Template() *cloudformation.Template { template := cloudformation.NewTemplate() prefix := func(suffix string) string { - return normalizeResourceName("fnb" + s.config.Name + suffix) + return NormalizeResourceName("fnb" + s.config.Name + suffix) } for _, trigger := range s.config.Triggers { - resourceName := prefix("SQS") + normalizeResourceName(trigger.EventSourceArn) + resourceName := prefix("SQS") + NormalizeResourceName(trigger.EventSourceArn) template.Resources[resourceName] = &cloudformation.AWSLambdaEventSourceMapping{ BatchSize: batchSize, EventSourceArn: trigger.EventSourceArn, @@ -147,6 +153,6 @@ func (s *SQS) Policies() []cloudformation.AWSIAMRole_Policy { } // LambdaConfig returns the configuration to use when creating the lambda. -func (s *SQS) LambdaConfig() *lambdaConfig { +func (s *SQS) LambdaConfig() *LambdaConfig { return s.config.LambdaConfig } diff --git a/x-pack/functionbeat/provider/aws/sqs_test.go b/x-pack/functionbeat/provider/aws/aws/sqs_test.go similarity index 95% rename from x-pack/functionbeat/provider/aws/sqs_test.go rename to x-pack/functionbeat/provider/aws/aws/sqs_test.go index db4aa93dc05..2d5ec40b4c3 100644 --- a/x-pack/functionbeat/provider/aws/sqs_test.go +++ b/x-pack/functionbeat/provider/aws/aws/sqs_test.go @@ -12,7 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" ) func TestSQS(t *testing.T) { diff --git a/x-pack/functionbeat/provider/aws/transformer/transformer.go b/x-pack/functionbeat/provider/aws/aws/transformer/transformer.go similarity index 100% rename from x-pack/functionbeat/provider/aws/transformer/transformer.go rename to x-pack/functionbeat/provider/aws/aws/transformer/transformer.go diff --git a/x-pack/functionbeat/provider/aws/transformer/transformer_test.go b/x-pack/functionbeat/provider/aws/aws/transformer/transformer_test.go similarity index 100% rename from x-pack/functionbeat/provider/aws/transformer/transformer_test.go rename to x-pack/functionbeat/provider/aws/aws/transformer/transformer_test.go diff --git a/x-pack/functionbeat/provider/aws/cmd/root.go b/x-pack/functionbeat/provider/aws/cmd/root.go new file mode 100644 index 00000000000..495ba81ffcc --- /dev/null +++ b/x-pack/functionbeat/provider/aws/cmd/root.go @@ -0,0 +1,25 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + "flag" + + "github.com/elastic/beats/x-pack/functionbeat/function/beater" + funcmd "github.com/elastic/beats/x-pack/functionbeat/function/cmd" +) + +// Name of this beat +var Name = "functionbeat" + +// RootCmd to handle functionbeat +var RootCmd *funcmd.FunctionCmd + +func init() { + RootCmd = funcmd.NewFunctionCmd(Name, beater.New) + RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("d")) + RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("v")) + RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("e")) +} diff --git a/x-pack/functionbeat/provider/aws/include/feature.go b/x-pack/functionbeat/provider/aws/include/feature.go new file mode 100644 index 00000000000..4d6b270cde9 --- /dev/null +++ b/x-pack/functionbeat/provider/aws/include/feature.go @@ -0,0 +1,34 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package include + +import ( + "github.com/elastic/beats/libbeat/feature" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" + "github.com/elastic/beats/x-pack/functionbeat/provider/aws/aws" +) + +// Bundle exposes the trigger supported by the AWS provider. +var bundle = provider.MustCreate( + "aws", + provider.NewDefaultProvider("aws", provider.NewNullCli, provider.NewNullTemplateBuilder), + feature.NewDetails("AWS Lambda", "listen to events on AWS lambda", feature.Stable), +).MustAddFunction("cloudwatch_logs", + aws.NewCloudwatchLogs, + aws.CloudwatchLogsDetails(), +).MustAddFunction("api_gateway_proxy", + aws.NewAPIGatewayProxy, + aws.APIGatewayProxyDetails(), +).MustAddFunction("kinesis", + aws.NewKinesis, + aws.KinesisDetails(), +).MustAddFunction("sqs", + aws.NewSQS, + aws.SQSDetails(), +).Bundle() + +func init() { + feature.MustRegisterBundle(bundle) +} diff --git a/x-pack/functionbeat/provider/aws/main.go b/x-pack/functionbeat/provider/aws/main.go new file mode 100644 index 00000000000..2016f5ade04 --- /dev/null +++ b/x-pack/functionbeat/provider/aws/main.go @@ -0,0 +1,18 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +import ( + "os" + + "github.com/elastic/beats/x-pack/functionbeat/provider/aws/cmd" + _ "github.com/elastic/beats/x-pack/functionbeat/provider/aws/include" +) + +func main() { + if err := cmd.RootCmd.Execute(); err != nil { + os.Exit(1) + } +} diff --git a/x-pack/functionbeat/provider/aws/main_test.go b/x-pack/functionbeat/provider/aws/main_test.go new file mode 100644 index 00000000000..af229d796c3 --- /dev/null +++ b/x-pack/functionbeat/provider/aws/main_test.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +// This file is mandatory as otherwise the functionbeat.test binary is not generated correctly. + +import ( + "flag" + "testing" + + "github.com/elastic/beats/x-pack/functionbeat/provider/aws/cmd" +) + +var systemTest *bool + +func init() { + systemTest = flag.Bool("systemTest", false, "Set to true when running system tests") + + cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("systemTest")) + cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("test.coverprofile")) +} + +// Test started when the test binary is started. Only calls main. +func TestSystem(t *testing.T) { + + if *systemTest { + main() + } +} diff --git a/x-pack/functionbeat/provider/local/cmd/root.go b/x-pack/functionbeat/provider/local/cmd/root.go new file mode 100644 index 00000000000..599eac10056 --- /dev/null +++ b/x-pack/functionbeat/provider/local/cmd/root.go @@ -0,0 +1,20 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + funcmd "github.com/elastic/beats/x-pack/functionbeat/function/cmd" + "github.com/elastic/beats/x-pack/functionbeat/manager/beater" +) + +// Name of this beat +var Name = "functionbeat" + +// RootCmd to handle functionbeat +var RootCmd *funcmd.FunctionCmd + +func init() { + RootCmd = funcmd.NewFunctionCmd(Name, beater.New) +} diff --git a/x-pack/functionbeat/provider/local/include/feature.go b/x-pack/functionbeat/provider/local/include/feature.go new file mode 100644 index 00000000000..5de6edd3874 --- /dev/null +++ b/x-pack/functionbeat/provider/local/include/feature.go @@ -0,0 +1,19 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package include + +import ( + "github.com/elastic/beats/libbeat/feature" + "github.com/elastic/beats/x-pack/functionbeat/provider/local/local" +) + +// Bundle feature enabled. +var Bundle = feature.MustBundle( + local.Bundle, +) + +func init() { + feature.MustRegisterBundle(Bundle) +} diff --git a/x-pack/functionbeat/provider/local/local.go b/x-pack/functionbeat/provider/local/local/local.go similarity index 95% rename from x-pack/functionbeat/provider/local/local.go rename to x-pack/functionbeat/provider/local/local/local.go index bf4318c46e7..3d767fe27d2 100644 --- a/x-pack/functionbeat/provider/local/local.go +++ b/x-pack/functionbeat/provider/local/local/local.go @@ -13,8 +13,8 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/feature" - "github.com/elastic/beats/x-pack/functionbeat/core" - "github.com/elastic/beats/x-pack/functionbeat/provider" + "github.com/elastic/beats/x-pack/functionbeat/function/core" + "github.com/elastic/beats/x-pack/functionbeat/function/provider" ) const stdinName = "stdin" diff --git a/x-pack/functionbeat/provider/local/main.go b/x-pack/functionbeat/provider/local/main.go new file mode 100644 index 00000000000..4bb606b278d --- /dev/null +++ b/x-pack/functionbeat/provider/local/main.go @@ -0,0 +1,18 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +import ( + "os" + + "github.com/elastic/beats/x-pack/functionbeat/provider/local/cmd" + _ "github.com/elastic/beats/x-pack/functionbeat/provider/local/include" // imports features +) + +func main() { + if err := cmd.RootCmd.Execute(); err != nil { + os.Exit(1) + } +} diff --git a/x-pack/functionbeat/provider/local/main_test.go b/x-pack/functionbeat/provider/local/main_test.go new file mode 100644 index 00000000000..141f96777a5 --- /dev/null +++ b/x-pack/functionbeat/provider/local/main_test.go @@ -0,0 +1,30 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +// This file is mandatory as otherwise the functionbeat.test binary is not generated correctly. + +import ( + "flag" + "testing" + + "github.com/elastic/beats/x-pack/functionbeat/provider/local/cmd" +) + +var systemTest *bool + +func init() { + systemTest = flag.Bool("systemTest", false, "Set to true when running system tests") + + cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("systemTest")) + cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("test.coverprofile")) +} + +// Test started when the test binary is started. Only calls main. +func TestSystem(t *testing.T) { + if *systemTest { + main() + } +} diff --git a/x-pack/functionbeat/scripts/mage/config.go b/x-pack/functionbeat/scripts/mage/config.go index 571bff4a6de..7f65e164d83 100644 --- a/x-pack/functionbeat/scripts/mage/config.go +++ b/x-pack/functionbeat/scripts/mage/config.go @@ -5,19 +5,19 @@ package mage import ( - "github.com/elastic/beats/dev-tools/mage" + devtools "github.com/elastic/beats/dev-tools/mage" ) // XPackConfigFileParams returns the configuration of sample and reference configuration data. -func XPackConfigFileParams() mage.ConfigFileParams { - return mage.ConfigFileParams{ +func XPackConfigFileParams() devtools.ConfigFileParams { + return devtools.ConfigFileParams{ ShortParts: []string{ - mage.OSSBeatDir("_meta/beat.yml"), - mage.LibbeatDir("_meta/config.yml.tmpl"), + devtools.OSSBeatDir("_meta/beat.yml"), + devtools.LibbeatDir("_meta/config.yml.tmpl"), }, ReferenceParts: []string{ - mage.OSSBeatDir("_meta/beat.reference.yml"), - mage.LibbeatDir("_meta/config.reference.yml.tmpl"), + devtools.OSSBeatDir("_meta/beat.reference.yml"), + devtools.LibbeatDir("_meta/config.reference.yml.tmpl"), }, ExtraVars: map[string]interface{}{ "ExcludeConsole": true, diff --git a/x-pack/functionbeat/scripts/mage/providers.go b/x-pack/functionbeat/scripts/mage/providers.go new file mode 100644 index 00000000000..40013d3214e --- /dev/null +++ b/x-pack/functionbeat/scripts/mage/providers.go @@ -0,0 +1,29 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package mage + +import ( + "os" + "strings" +) + +var ( + // SelectedProviders is the list of selected providers + // Can be configured by setting PROVIDERS enviroment variable. + SelectedProviders = getConfiguredProviders() + + availableProviders = []string{ + "aws", + } +) + +func getConfiguredProviders() []string { + providers := os.Getenv("PROVIDERS") + if len(providers) == 0 { + return availableProviders + } + + return strings.Split(providers, ",") +} diff --git a/x-pack/functionbeat/scripts/mage/update.go b/x-pack/functionbeat/scripts/mage/update.go new file mode 100644 index 00000000000..82ab15796a1 --- /dev/null +++ b/x-pack/functionbeat/scripts/mage/update.go @@ -0,0 +1,48 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package mage + +import ( + "github.com/magefile/mage/mg" + + devtools "github.com/elastic/beats/dev-tools/mage" +) + +// Update target namespace. +type Update mg.Namespace + +// Aliases stores aliases for the targets. +var Aliases = map[string]interface{}{ + "update": Update.All, +} + +// All updates all generated content. +func (Update) All() { + mg.Deps(Update.Fields, Update.IncludeFields, Update.Config, Update.FieldDocs) +} + +// Config generates both the short and reference configs. +func (Update) Config() error { + return devtools.Config(devtools.ShortConfigType|devtools.ReferenceConfigType, XPackConfigFileParams(), ".") +} + +// Fields generates a fields.yml for the Beat. +func (Update) Fields() error { + return devtools.GenerateFieldsYAML() +} + +// FieldDocs collects all fields by provider and generates documentation for them. +func (Update) FieldDocs() error { + mg.Deps(Update.Fields) + + return devtools.Docs.FieldDocs("fields.yml") +} + +// IncludeFields generates include/fields.go by provider. +func (Update) IncludeFields() error { + mg.Deps(Update.Fields) + + return devtools.GenerateAllInOneFieldsGo() +} diff --git a/x-pack/functionbeat/tests/system/test_base.py b/x-pack/functionbeat/tests/system/test_base.py index ad965288ccf..1154fb1baf4 100644 --- a/x-pack/functionbeat/tests/system/test_base.py +++ b/x-pack/functionbeat/tests/system/test_base.py @@ -6,7 +6,6 @@ class Test(BaseTest): - @unittest.skip("temporarily disabled") def test_base(self): """ @@ -96,16 +95,25 @@ def test_export_function_invalid_conf(self): assert exit_code != 0 def _generate_dummy_binary_for_template_checksum(self): - if os.path.exists("pkg/functionbeat"): - return + fnbeat_pkg = os.path.join("pkg", "functionbeat") + fnbeat_aws_pkg = os.path.join("pkg", "functionbeat-aws") + bins_to_gen = [fnbeat_pkg, fnbeat_aws_pkg] + + if not os.path.exists("pkg"): + os.mkdir("pkg") - os.mkdir("pkg") - with open("pkg/functionbeat", "wb") as f: - f.write("my dummy functionbeat binary") + for fb in bins_to_gen: + if os.path.exists(fb): + continue + with open(fb, "wb") as f: + f.write("my dummy functionbeat binary\n") def _get_generated_function_template(self): logs = self.get_log_lines() - function_template_lines = logs[:-2] + skipped_lines = -1 + if os.sys.platform.startswith("win"): + skipped_lines = -2 + function_template_lines = logs[:skipped_lines] raw_function_temaplate = "".join(function_template_lines) function_template = json.loads(raw_function_temaplate) return function_template diff --git a/x-pack/metricbeat/magefile.go b/x-pack/metricbeat/magefile.go index cce41582ca5..f5b1fc87d00 100644 --- a/x-pack/metricbeat/magefile.go +++ b/x-pack/metricbeat/magefile.go @@ -28,12 +28,6 @@ func init() { devtools.BeatLicense = "Elastic License" } -// Aliases provides compatibility with CI while we transition all Beats -// to having common testing targets. -var Aliases = map[string]interface{}{ - "goTestUnit": GoUnitTest, // dev-tools/jenkins_ci.ps1 uses this. -} - // Build builds the Beat binary. func Build() error { return devtools.Build(devtools.DefaultBuildArgs())