Skip to content

Commit

Permalink
Cache error responses for cloudfoundry apps metadata (elastic#19181)
Browse files Browse the repository at this point in the history
Cache error responses when requesting Cloud Foundry apps metadata to
avoid hitting continuously the API when there are missing applications.
  • Loading branch information
jsoriano authored and melchiormoulin committed Oct 14, 2020
1 parent ed19227 commit ff64dee
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Change ownership of files in docker images so they can be used in secured environments. {pull}12905[12905]
- Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817]
- Add support for multiple sets of hints on autodiscover {pull}18883[18883]
- Add a configurable delay between retries when an app metadata cannot be retrieved by `add_cloudfoundry_metadata`. {pull}19181[19181]

*Auditbeat*

Expand Down
44 changes: 29 additions & 15 deletions x-pack/libbeat/common/cloudfoundry/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,57 @@ type cfClient interface {

// clientCacheWrap wraps the cloudfoundry client to add a cache in front of GetAppByGuid.
type clientCacheWrap struct {
cache *common.Cache
client cfClient
log *logp.Logger
cache *common.Cache
client cfClient
log *logp.Logger
errorTTL time.Duration
}

// newClientCacheWrap creates a new cache for application data.
func newClientCacheWrap(client cfClient, ttl time.Duration, log *logp.Logger) *clientCacheWrap {
func newClientCacheWrap(client cfClient, ttl time.Duration, errorTTL time.Duration, log *logp.Logger) *clientCacheWrap {
return &clientCacheWrap{
cache: common.NewCacheWithExpireOnAdd(ttl, 100),
client: client,
log: log,
cache: common.NewCacheWithExpireOnAdd(ttl, 100),
client: client,
errorTTL: errorTTL,
log: log,
}
}

type appResponse struct {
app *cfclient.App
err error
}

// fetchApp uses the cfClient to retrieve an App entity and
// stores it in the internal cache
func (c *clientCacheWrap) fetchAppByGuid(guid string) (*cfclient.App, error) {
app, err := c.client.GetAppByGuid(guid)
resp := appResponse{
app: &app,
err: err,
}
timeout := time.Duration(0)
if err != nil {
return nil, err
// Cache nil, because is what we want to return when there was an error
resp.app = nil
timeout = c.errorTTL
}
c.cache.Put(app.Guid, &app)
return &app, nil
c.cache.PutWithTimeout(guid, &resp, timeout)
return resp.app, resp.err
}

// GetApp returns CF Application info, either from the cache or
// using the CF client.
func (c *clientCacheWrap) GetAppByGuid(guid string) (*cfclient.App, error) {
cachedApp := c.cache.Get(guid)
if cachedApp == nil {
cachedResp := c.cache.Get(guid)
if cachedResp == nil {
return c.fetchAppByGuid(guid)
}
app, ok := cachedApp.(*cfclient.App)
resp, ok := cachedResp.(*appResponse)
if !ok {
return nil, fmt.Errorf("error converting cached app")
return nil, fmt.Errorf("error converting cached app response (of type %T), this is likely a bug", cachedResp)
}
return app, nil
return resp.app, resp.err
}

// StartJanitor starts a goroutine that will periodically clean the applications cache.
Expand Down
77 changes: 77 additions & 0 deletions x-pack/libbeat/common/cloudfoundry/cache_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build integration
// +build cloudfoundry

package cloudfoundry

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cloudfoundry-community/go-cfclient"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
cftest "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry/test"
)

func TestGetApps(t *testing.T) {
var conf Config
err := common.MustNewConfigFrom(cftest.GetConfigFromEnv(t)).Unpack(&conf)
require.NoError(t, err)

log := logp.NewLogger("cloudfoundry")
hub := NewHub(&conf, "filebeat", log)

client, err := hub.Client()
require.NoError(t, err)
apps, err := client.(*clientCacheWrap).client.(*cfclient.Client).ListApps()
require.NoError(t, err)

t.Logf("%d applications available", len(apps))

t.Run("request one of the available applications", func(t *testing.T) {
if len(apps) == 0 {
t.Skip("no apps in account?")
}
client, err := hub.Client()
require.NoError(t, err)

guid := apps[0].Guid
app, err := client.GetAppByGuid(guid)
assert.Equal(t, guid, app.Guid)
assert.NoError(t, err)
})

t.Run("handle error when application is not available", func(t *testing.T) {
client, err := hub.Client()
require.NoError(t, err)

testNotExists := func(t *testing.T) {
app, err := client.GetAppByGuid("notexists")
assert.Nil(t, app)
assert.True(t, cfclient.IsAppNotFoundError(err))
}

var firstTimeDuration time.Duration
t.Run("first call", func(t *testing.T) {
startTime := time.Now()
testNotExists(t)
firstTimeDuration = time.Now().Sub(startTime)
})

t.Run("second call, in cache, faster, same response", func(t *testing.T) {
for i := 0; i < 10; i++ {
startTime := time.Now()
testNotExists(t)
require.True(t, firstTimeDuration > time.Now().Sub(startTime))
}
})
})
}
34 changes: 24 additions & 10 deletions x-pack/libbeat/common/cloudfoundry/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package cloudfoundry

import (
"fmt"
"testing"
"time"

Expand All @@ -26,17 +25,27 @@ func TestClientCacheWrap(t *testing.T) {
Memory: 1, // use this field to track if from cache or from client
}
fakeClient := &fakeCFClient{app, 0}
cache := newClientCacheWrap(fakeClient, ttl, logp.NewLogger("cloudfoundry"))
cache := newClientCacheWrap(fakeClient, ttl, ttl, logp.NewLogger("cloudfoundry"))

missingAppGuid := mustCreateFakeGuid()

// should err; different app client doesn't have
_, err := cache.GetAppByGuid(mustCreateFakeGuid())
assert.Error(t, err)
one, err := cache.GetAppByGuid(missingAppGuid)
assert.Nil(t, one)
assert.True(t, cfclient.IsAppNotFoundError(err))
assert.Equal(t, 1, fakeClient.callCount)

// calling again; the miss should be cached
one, err = cache.GetAppByGuid(missingAppGuid)
assert.Nil(t, one)
assert.True(t, cfclient.IsAppNotFoundError(err))
assert.Equal(t, 1, fakeClient.callCount)

// fetched from client for the first time
one, err := cache.GetAppByGuid(guid)
one, err = cache.GetAppByGuid(guid)
assert.NoError(t, err)
assert.Equal(t, app, *one)
assert.Equal(t, 1, fakeClient.callCount)
assert.Equal(t, 2, fakeClient.callCount)

// updated app in fake client, new fetch should not have updated app
updatedApp := cfclient.App{
Expand All @@ -47,14 +56,14 @@ func TestClientCacheWrap(t *testing.T) {
two, err := cache.GetAppByGuid(guid)
assert.NoError(t, err)
assert.Equal(t, app, *two)
assert.Equal(t, 1, fakeClient.callCount)
assert.Equal(t, 2, fakeClient.callCount)

// wait the ttl, then it should have updated app
time.Sleep(ttl)
three, err := cache.GetAppByGuid(guid)
assert.NoError(t, err)
assert.Equal(t, updatedApp, *three)
assert.Equal(t, 2, fakeClient.callCount)
assert.Equal(t, 3, fakeClient.callCount)
}

type fakeCFClient struct {
Expand All @@ -63,10 +72,10 @@ type fakeCFClient struct {
}

func (f *fakeCFClient) GetAppByGuid(guid string) (cfclient.App, error) {
f.callCount++
if f.app.Guid != guid {
return f.app, fmt.Errorf("no app with guid")
return cfclient.App{}, notFoundError()
}
f.callCount++
return f.app, nil
}

Expand All @@ -77,3 +86,8 @@ func mustCreateFakeGuid() string {
}
return uuid.String()
}

// notFoundError returns a cloud foundry error that satisfies cfclient.IsAppNotFoundError(err)
func notFoundError() error {
return cfclient.CloudFoundryError{Code: 100004}
}
6 changes: 5 additions & 1 deletion x-pack/libbeat/common/cloudfoundry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ type Config struct {
// multiple filebeats will shard the load of receiving and sending events.
ShardID string `config:"shard_id"`

// Maximum amount of time to cache application objects from CF client
// Maximum amount of time to cache application objects from CF client.
CacheDuration time.Duration `config:"cache_duration"`

// Time to wait before retrying to get application info in case of error.
CacheRetryDelay time.Duration `config:"cache_retry_delay"`
}

// InitDefaults initialize the defaults for the configuration.
Expand All @@ -55,6 +58,7 @@ func (c *Config) InitDefaults() {
}
c.ShardID = uuid.String()
c.CacheDuration = 120 * time.Second
c.CacheRetryDelay = 20 * time.Second
c.Version = ConsumerVersionV1
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/libbeat/common/cloudfoundry/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (h *Hub) Client() (Client, error) {
if h.cfg.UaaAddress != "" {
cf.Endpoint.AuthEndpoint = h.cfg.UaaAddress
}
return newClientCacheWrap(cf, h.cfg.CacheDuration, h.log), nil
return newClientCacheWrap(cf, h.cfg.CacheDuration, h.cfg.CacheRetryDelay, h.log), nil
}

// RlpListener returns a listener client that calls the passed callback when the provided events are streamed through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (d *addCloudFoundryMetadata) Run(event *beat.Event) (*beat.Event, error) {
}
valI, err := event.GetValue("cloudfoundry.app.id")
if err != nil {
// doesn't have the required cf.app.id value to add more information
// doesn't have the required cloudfoundry.app.id value to add more information
return event, nil
}
val, _ := valI.(string)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package add_cloudfoundry_metadata

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -148,7 +147,7 @@ type fakeClient struct {

func (c *fakeClient) GetAppByGuid(guid string) (*cfclient.App, error) {
if c.app.Guid != guid {
return nil, fmt.Errorf("unknown app")
return nil, cfclient.CloudFoundryError{Code: 100004}
}
return &c.app, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ It has the following settings:

`client_secret`:: Client Secret to authenticate with Cloud Foundry.

`cache_duration`:: (Optional) Maximum amount of time to cache an application's metadata.
`cache_duration`:: (Optional) Maximum amount of time to cache an application's metadata. Defaults to 120 seconds.

`cache_retry_delay`:: (Optional) Time to wait before trying to obtain an application's metadata again in case of error. Defaults to 20 seconds.

`ssl`:: (Optional) SSL configuration to use when connecting to Cloud Foundry.

0 comments on commit ff64dee

Please sign in to comment.