diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4726420da20..a49e352f600 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -316,6 +316,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Change ownership of files in docker images so they can be used in secured environments. {pull}12905[12905] - Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817] - Add support for multiple sets of hints on autodiscover {pull}18883[18883] +- Add a configurable delay between retries when an app metadata cannot be retrieved by `add_cloudfoundry_metadata`. {pull}19181[19181] *Auditbeat* diff --git a/x-pack/libbeat/common/cloudfoundry/cache.go b/x-pack/libbeat/common/cloudfoundry/cache.go index bc87fb11836..22f41f3b23c 100644 --- a/x-pack/libbeat/common/cloudfoundry/cache.go +++ b/x-pack/libbeat/common/cloudfoundry/cache.go @@ -22,43 +22,57 @@ type cfClient interface { // clientCacheWrap wraps the cloudfoundry client to add a cache in front of GetAppByGuid. type clientCacheWrap struct { - cache *common.Cache - client cfClient - log *logp.Logger + cache *common.Cache + client cfClient + log *logp.Logger + errorTTL time.Duration } // newClientCacheWrap creates a new cache for application data. -func newClientCacheWrap(client cfClient, ttl time.Duration, log *logp.Logger) *clientCacheWrap { +func newClientCacheWrap(client cfClient, ttl time.Duration, errorTTL time.Duration, log *logp.Logger) *clientCacheWrap { return &clientCacheWrap{ - cache: common.NewCacheWithExpireOnAdd(ttl, 100), - client: client, - log: log, + cache: common.NewCacheWithExpireOnAdd(ttl, 100), + client: client, + errorTTL: errorTTL, + log: log, } } +type appResponse struct { + app *cfclient.App + err error +} + // fetchApp uses the cfClient to retrieve an App entity and // stores it in the internal cache func (c *clientCacheWrap) fetchAppByGuid(guid string) (*cfclient.App, error) { app, err := c.client.GetAppByGuid(guid) + resp := appResponse{ + app: &app, + err: err, + } + timeout := time.Duration(0) if err != nil { - return nil, err + // Cache nil, because is what we want to return when there was an error + resp.app = nil + timeout = c.errorTTL } - c.cache.Put(app.Guid, &app) - return &app, nil + c.cache.PutWithTimeout(guid, &resp, timeout) + return resp.app, resp.err } // GetApp returns CF Application info, either from the cache or // using the CF client. func (c *clientCacheWrap) GetAppByGuid(guid string) (*cfclient.App, error) { - cachedApp := c.cache.Get(guid) - if cachedApp == nil { + cachedResp := c.cache.Get(guid) + if cachedResp == nil { return c.fetchAppByGuid(guid) } - app, ok := cachedApp.(*cfclient.App) + resp, ok := cachedResp.(*appResponse) if !ok { - return nil, fmt.Errorf("error converting cached app") + return nil, fmt.Errorf("error converting cached app response (of type %T), this is likely a bug", cachedResp) } - return app, nil + return resp.app, resp.err } // StartJanitor starts a goroutine that will periodically clean the applications cache. diff --git a/x-pack/libbeat/common/cloudfoundry/cache_integration_test.go b/x-pack/libbeat/common/cloudfoundry/cache_integration_test.go new file mode 100644 index 00000000000..f6af11787c9 --- /dev/null +++ b/x-pack/libbeat/common/cloudfoundry/cache_integration_test.go @@ -0,0 +1,77 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build integration +// +build cloudfoundry + +package cloudfoundry + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cloudfoundry-community/go-cfclient" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + cftest "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry/test" +) + +func TestGetApps(t *testing.T) { + var conf Config + err := common.MustNewConfigFrom(cftest.GetConfigFromEnv(t)).Unpack(&conf) + require.NoError(t, err) + + log := logp.NewLogger("cloudfoundry") + hub := NewHub(&conf, "filebeat", log) + + client, err := hub.Client() + require.NoError(t, err) + apps, err := client.(*clientCacheWrap).client.(*cfclient.Client).ListApps() + require.NoError(t, err) + + t.Logf("%d applications available", len(apps)) + + t.Run("request one of the available applications", func(t *testing.T) { + if len(apps) == 0 { + t.Skip("no apps in account?") + } + client, err := hub.Client() + require.NoError(t, err) + + guid := apps[0].Guid + app, err := client.GetAppByGuid(guid) + assert.Equal(t, guid, app.Guid) + assert.NoError(t, err) + }) + + t.Run("handle error when application is not available", func(t *testing.T) { + client, err := hub.Client() + require.NoError(t, err) + + testNotExists := func(t *testing.T) { + app, err := client.GetAppByGuid("notexists") + assert.Nil(t, app) + assert.True(t, cfclient.IsAppNotFoundError(err)) + } + + var firstTimeDuration time.Duration + t.Run("first call", func(t *testing.T) { + startTime := time.Now() + testNotExists(t) + firstTimeDuration = time.Now().Sub(startTime) + }) + + t.Run("second call, in cache, faster, same response", func(t *testing.T) { + for i := 0; i < 10; i++ { + startTime := time.Now() + testNotExists(t) + require.True(t, firstTimeDuration > time.Now().Sub(startTime)) + } + }) + }) +} diff --git a/x-pack/libbeat/common/cloudfoundry/cache_test.go b/x-pack/libbeat/common/cloudfoundry/cache_test.go index 678dd74ecfe..9e18a5ac86e 100644 --- a/x-pack/libbeat/common/cloudfoundry/cache_test.go +++ b/x-pack/libbeat/common/cloudfoundry/cache_test.go @@ -7,7 +7,6 @@ package cloudfoundry import ( - "fmt" "testing" "time" @@ -26,17 +25,27 @@ func TestClientCacheWrap(t *testing.T) { Memory: 1, // use this field to track if from cache or from client } fakeClient := &fakeCFClient{app, 0} - cache := newClientCacheWrap(fakeClient, ttl, logp.NewLogger("cloudfoundry")) + cache := newClientCacheWrap(fakeClient, ttl, ttl, logp.NewLogger("cloudfoundry")) + + missingAppGuid := mustCreateFakeGuid() // should err; different app client doesn't have - _, err := cache.GetAppByGuid(mustCreateFakeGuid()) - assert.Error(t, err) + one, err := cache.GetAppByGuid(missingAppGuid) + assert.Nil(t, one) + assert.True(t, cfclient.IsAppNotFoundError(err)) + assert.Equal(t, 1, fakeClient.callCount) + + // calling again; the miss should be cached + one, err = cache.GetAppByGuid(missingAppGuid) + assert.Nil(t, one) + assert.True(t, cfclient.IsAppNotFoundError(err)) + assert.Equal(t, 1, fakeClient.callCount) // fetched from client for the first time - one, err := cache.GetAppByGuid(guid) + one, err = cache.GetAppByGuid(guid) assert.NoError(t, err) assert.Equal(t, app, *one) - assert.Equal(t, 1, fakeClient.callCount) + assert.Equal(t, 2, fakeClient.callCount) // updated app in fake client, new fetch should not have updated app updatedApp := cfclient.App{ @@ -47,14 +56,14 @@ func TestClientCacheWrap(t *testing.T) { two, err := cache.GetAppByGuid(guid) assert.NoError(t, err) assert.Equal(t, app, *two) - assert.Equal(t, 1, fakeClient.callCount) + assert.Equal(t, 2, fakeClient.callCount) // wait the ttl, then it should have updated app time.Sleep(ttl) three, err := cache.GetAppByGuid(guid) assert.NoError(t, err) assert.Equal(t, updatedApp, *three) - assert.Equal(t, 2, fakeClient.callCount) + assert.Equal(t, 3, fakeClient.callCount) } type fakeCFClient struct { @@ -63,10 +72,10 @@ type fakeCFClient struct { } func (f *fakeCFClient) GetAppByGuid(guid string) (cfclient.App, error) { + f.callCount++ if f.app.Guid != guid { - return f.app, fmt.Errorf("no app with guid") + return cfclient.App{}, notFoundError() } - f.callCount++ return f.app, nil } @@ -77,3 +86,8 @@ func mustCreateFakeGuid() string { } return uuid.String() } + +// notFoundError returns a cloud foundry error that satisfies cfclient.IsAppNotFoundError(err) +func notFoundError() error { + return cfclient.CloudFoundryError{Code: 100004} +} diff --git a/x-pack/libbeat/common/cloudfoundry/config.go b/x-pack/libbeat/common/cloudfoundry/config.go index 8f1139bd7a2..0724bdc66e1 100644 --- a/x-pack/libbeat/common/cloudfoundry/config.go +++ b/x-pack/libbeat/common/cloudfoundry/config.go @@ -41,8 +41,11 @@ type Config struct { // multiple filebeats will shard the load of receiving and sending events. ShardID string `config:"shard_id"` - // Maximum amount of time to cache application objects from CF client + // Maximum amount of time to cache application objects from CF client. CacheDuration time.Duration `config:"cache_duration"` + + // Time to wait before retrying to get application info in case of error. + CacheRetryDelay time.Duration `config:"cache_retry_delay"` } // InitDefaults initialize the defaults for the configuration. @@ -55,6 +58,7 @@ func (c *Config) InitDefaults() { } c.ShardID = uuid.String() c.CacheDuration = 120 * time.Second + c.CacheRetryDelay = 20 * time.Second c.Version = ConsumerVersionV1 } diff --git a/x-pack/libbeat/common/cloudfoundry/hub.go b/x-pack/libbeat/common/cloudfoundry/hub.go index 823087ea959..4bb7fce1eec 100644 --- a/x-pack/libbeat/common/cloudfoundry/hub.go +++ b/x-pack/libbeat/common/cloudfoundry/hub.go @@ -67,7 +67,7 @@ func (h *Hub) Client() (Client, error) { if h.cfg.UaaAddress != "" { cf.Endpoint.AuthEndpoint = h.cfg.UaaAddress } - return newClientCacheWrap(cf, h.cfg.CacheDuration, h.log), nil + return newClientCacheWrap(cf, h.cfg.CacheDuration, h.cfg.CacheRetryDelay, h.log), nil } // RlpListener returns a listener client that calls the passed callback when the provided events are streamed through diff --git a/x-pack/libbeat/processors/add_cloudfoundry_metadata/add_cloudfoundry_metadata.go b/x-pack/libbeat/processors/add_cloudfoundry_metadata/add_cloudfoundry_metadata.go index a80cc24700c..d18a04ca979 100644 --- a/x-pack/libbeat/processors/add_cloudfoundry_metadata/add_cloudfoundry_metadata.go +++ b/x-pack/libbeat/processors/add_cloudfoundry_metadata/add_cloudfoundry_metadata.go @@ -60,7 +60,7 @@ func (d *addCloudFoundryMetadata) Run(event *beat.Event) (*beat.Event, error) { } valI, err := event.GetValue("cloudfoundry.app.id") if err != nil { - // doesn't have the required cf.app.id value to add more information + // doesn't have the required cloudfoundry.app.id value to add more information return event, nil } val, _ := valI.(string) diff --git a/x-pack/libbeat/processors/add_cloudfoundry_metadata/add_cloudfoundry_metadata_test.go b/x-pack/libbeat/processors/add_cloudfoundry_metadata/add_cloudfoundry_metadata_test.go index ca83a3e24c2..1aff4cb2df8 100644 --- a/x-pack/libbeat/processors/add_cloudfoundry_metadata/add_cloudfoundry_metadata_test.go +++ b/x-pack/libbeat/processors/add_cloudfoundry_metadata/add_cloudfoundry_metadata_test.go @@ -5,7 +5,6 @@ package add_cloudfoundry_metadata import ( - "fmt" "testing" "time" @@ -148,7 +147,7 @@ type fakeClient struct { func (c *fakeClient) GetAppByGuid(guid string) (*cfclient.App, error) { if c.app.Guid != guid { - return nil, fmt.Errorf("unknown app") + return nil, cfclient.CloudFoundryError{Code: 100004} } return &c.app, nil } diff --git a/x-pack/libbeat/processors/add_cloudfoundry_metadata/docs/add_cloudfoundry_metadata.asciidoc b/x-pack/libbeat/processors/add_cloudfoundry_metadata/docs/add_cloudfoundry_metadata.asciidoc index 7c5b2daba96..558b5a1031b 100644 --- a/x-pack/libbeat/processors/add_cloudfoundry_metadata/docs/add_cloudfoundry_metadata.asciidoc +++ b/x-pack/libbeat/processors/add_cloudfoundry_metadata/docs/add_cloudfoundry_metadata.asciidoc @@ -53,6 +53,8 @@ It has the following settings: `client_secret`:: Client Secret to authenticate with Cloud Foundry. -`cache_duration`:: (Optional) Maximum amount of time to cache an application's metadata. +`cache_duration`:: (Optional) Maximum amount of time to cache an application's metadata. Defaults to 120 seconds. + +`cache_retry_delay`:: (Optional) Time to wait before trying to obtain an application's metadata again in case of error. Defaults to 20 seconds. `ssl`:: (Optional) SSL configuration to use when connecting to Cloud Foundry.