Skip to content

Commit

Permalink
mercury cache fixes (#11448)
Browse files Browse the repository at this point in the history
* Properly pass through config

* Key cache by feed ID, NOT by req

req is a pointer meaning subsequent calls to LatestReport will never hit
the cache

* Remove logger from config
  • Loading branch information
samsondav authored Dec 1, 2023
1 parent eac8ebd commit e140618
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 59 deletions.
1 change: 0 additions & 1 deletion core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
loopRegistry := plugins.NewLoopRegistry(appLggr, cfg.Tracing())

mercuryPool := wsrpc.NewPool(appLggr, cache.Config{
Logger: appLggr,
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(),
LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(),
Expand Down
1 change: 0 additions & 1 deletion core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
loopRegistry := plugins.NewLoopRegistry(lggr, nil)

mercuryPool := wsrpc.NewPool(lggr, cache.Config{
Logger: lggr,
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(),
LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(),
Expand Down
53 changes: 26 additions & 27 deletions core/services/relay/evm/mercury/wsrpc/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ type Cache interface {
}

type Config struct {
Logger logger.Logger
// LatestReportTTL controls how "stale" we will allow a price to be e.g. if
// set to 1s, a new price will always be fetched if the last result was
// from more than 1 second ago.
Expand All @@ -84,8 +83,8 @@ type Config struct {
LatestReportDeadline time.Duration
}

func NewCache(client Client, cfg Config) Cache {
return newMemCache(client, cfg)
func NewCache(lggr logger.Logger, client Client, cfg Config) Cache {
return newMemCache(lggr, client, cfg)
}

type cacheVal struct {
Expand Down Expand Up @@ -164,24 +163,20 @@ type memCache struct {

client Client

latestPriceTTL time.Duration
maxStaleAge time.Duration
latestReportDeadline time.Duration
cfg Config

cache sync.Map

wg sync.WaitGroup
chStop services.StopChan
}

func newMemCache(client Client, cfg Config) *memCache {
func newMemCache(lggr logger.Logger, client Client, cfg Config) *memCache {
return &memCache{
services.StateMachine{},
cfg.Logger.Named("MercuryMemCache"),
lggr.Named("MemCache"),
client,
cfg.LatestReportTTL,
cfg.MaxStaleAge,
cfg.LatestReportDeadline,
cfg,
sync.Map{},
sync.WaitGroup{},
make(chan (struct{})),
Expand All @@ -197,10 +192,11 @@ func (m *memCache) LatestReport(ctx context.Context, req *pb.LatestReportRequest
if req == nil {
return nil, errors.New("req must not be nil")
}
if m.latestPriceTTL <= 0 {
feedIDHex := mercuryutils.BytesToFeedID(req.FeedId).String()
if m.cfg.LatestReportTTL <= 0 {
return m.client.RawClient().LatestReport(ctx, req)
}
vi, _ := m.cache.LoadOrStore(req, &cacheVal{
vi, loaded := m.cache.LoadOrStore(feedIDHex, &cacheVal{
sync.RWMutex{},
false,
nil,
Expand All @@ -210,44 +206,46 @@ func (m *memCache) LatestReport(ctx context.Context, req *pb.LatestReportRequest
})
v := vi.(*cacheVal)

m.lggr.Tracew("LatestReport", "feedID", feedIDHex, "loaded", loaded)

// HOT PATH
v.RLock()
if time.Now().Before(v.expiresAt) {
// CACHE HIT
promCacheHitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheHitCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()

defer v.RUnlock()
return v.val, nil
} else if v.fetching {
// CACHE WAIT
promCacheWaitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheWaitCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()
// if someone else is fetching then wait for the fetch to complete
ch := v.fetchCh
v.RUnlock()
return v.waitForResult(ctx, ch, m.chStop)
}
// CACHE MISS
promCacheMissCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheMissCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()
// fallthrough to cold path and fetch
v.RUnlock()

// COLD PATH
v.Lock()
if time.Now().Before(v.expiresAt) {
// CACHE HIT
promCacheHitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheHitCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()
defer v.RUnlock()
return v.val, nil
} else if v.fetching {
// CACHE WAIT
promCacheWaitCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheWaitCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()
// if someone else is fetching then wait for the fetch to complete
ch := v.fetchCh
v.Unlock()
return v.waitForResult(ctx, ch, m.chStop)
}
// CACHE MISS
promCacheMissCount.WithLabelValues(m.client.ServerURL(), mercuryutils.BytesToFeedID(req.FeedId).String()).Inc()
promCacheMissCount.WithLabelValues(m.client.ServerURL(), feedIDHex).Inc()
// initiate the fetch and wait for result
ch := v.initiateFetch()
v.Unlock()
Expand All @@ -269,7 +267,7 @@ const minBackoffRetryInterval = 50 * time.Millisecond
// newBackoff creates a backoff for retrying
func (m *memCache) newBackoff() backoff.Backoff {
min := minBackoffRetryInterval
max := m.latestPriceTTL / 2
max := m.cfg.LatestReportTTL / 2
if min > max {
// avoid setting a min that is greater than max
min = max
Expand All @@ -293,16 +291,16 @@ func (m *memCache) fetch(req *pb.LatestReportRequest, v *cacheVal) {
var val *pb.LatestReportResponse
var err error
defer func() {
v.completeFetch(val, err, t.Add(m.latestPriceTTL))
v.completeFetch(val, err, t.Add(m.cfg.LatestReportTTL))
}()

for {
t = time.Now()

ctx := memcacheCtx
cancel := func() {}
if m.latestReportDeadline > 0 {
ctx, cancel = context.WithTimeoutCause(memcacheCtx, m.latestReportDeadline, errors.New("latest report fetch deadline exceeded"))
if m.cfg.LatestReportDeadline > 0 {
ctx, cancel = context.WithTimeoutCause(memcacheCtx, m.cfg.LatestReportDeadline, errors.New("latest report fetch deadline exceeded"))
}

// NOTE: must drop down to RawClient here otherwise we enter an
Expand Down Expand Up @@ -330,6 +328,7 @@ func (m *memCache) fetch(req *pb.LatestReportRequest, v *cacheVal) {

func (m *memCache) Start(context.Context) error {
return m.StartOnce(m.Name(), func() error {
m.lggr.Debugw("MemCache starting", "config", m.cfg)
m.wg.Add(1)
go m.runloop()
return nil
Expand All @@ -339,16 +338,16 @@ func (m *memCache) Start(context.Context) error {
func (m *memCache) runloop() {
defer m.wg.Done()

if m.maxStaleAge == 0 {
if m.cfg.MaxStaleAge == 0 {
return
}
t := time.NewTicker(utils.WithJitter(m.maxStaleAge))
t := time.NewTicker(utils.WithJitter(m.cfg.MaxStaleAge))

for {
select {
case <-t.C:
m.cleanup()
t.Reset(utils.WithJitter(m.maxStaleAge))
t.Reset(utils.WithJitter(m.cfg.MaxStaleAge))
case <-m.chStop:
return
}
Expand All @@ -372,7 +371,7 @@ func (m *memCache) cleanup() {
// skip cleanup if fetching
return true
}
if time.Now().After(v.expiresAt.Add(m.maxStaleAge)) {
if time.Now().After(v.expiresAt.Add(m.cfg.MaxStaleAge)) {
// garbage collection
m.cache.Delete(k)
}
Expand Down
23 changes: 8 additions & 15 deletions core/services/relay/evm/mercury/wsrpc/cache/cache_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"sync"
"time"

"golang.org/x/exp/maps"

Expand All @@ -28,27 +27,26 @@ type cacheSet struct {
lggr logger.Logger
caches map[string]Cache

latestPriceTTL time.Duration
maxStaleAge time.Duration
cfg Config
}

func NewCacheSet(cfg Config) CacheSet {
return newCacheSet(cfg)
func NewCacheSet(lggr logger.Logger, cfg Config) CacheSet {
return newCacheSet(lggr, cfg)
}

func newCacheSet(cfg Config) *cacheSet {
func newCacheSet(lggr logger.Logger, cfg Config) *cacheSet {
return &cacheSet{
sync.RWMutex{},
services.StateMachine{},
cfg.Logger.Named("CacheSet"),
lggr.Named("CacheSet"),
make(map[string]Cache),
cfg.LatestReportTTL,
cfg.MaxStaleAge,
cfg,
}
}

func (cs *cacheSet) Start(context.Context) error {
return cs.StartOnce("CacheSet", func() error {
cs.lggr.Debugw("CacheSet starting", "config", cs.cfg)
return nil
})
}
Expand Down Expand Up @@ -93,12 +91,7 @@ func (cs *cacheSet) get(ctx context.Context, client Client) (Fetcher, error) {
if exists {
return c, nil
}
cfg := Config{
Logger: cs.lggr.With("serverURL", sURL),
LatestReportTTL: cs.latestPriceTTL,
MaxStaleAge: cs.maxStaleAge,
}
c = newMemCache(client, cfg)
c = newMemCache(cs.lggr, client, cs.cfg)
if err := c.Start(ctx); err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func Test_CacheSet(t *testing.T) {
lggr := logger.TestLogger(t)
cs := newCacheSet(Config{Logger: lggr})
cs := newCacheSet(lggr, Config{})
ctx := testutils.Context(t)
require.NoError(t, cs.Start(ctx))
t.Cleanup(func() {
Expand Down
22 changes: 12 additions & 10 deletions core/services/relay/evm/mercury/wsrpc/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,33 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb"
)

const neverExpireTTL = 1000 * time.Hour // some massive value that will never expire during a test

func Test_Cache(t *testing.T) {
lggr := logger.TestLogger(t)
client := &mockClient{}
cfg := Config{
Logger: logger.TestLogger(t),
}
cfg := Config{}
ctx := testutils.Context(t)

req1 := &pb.LatestReportRequest{FeedId: []byte{1}}
req2 := &pb.LatestReportRequest{FeedId: []byte{2}}
req3 := &pb.LatestReportRequest{FeedId: []byte{3}}

feedID1Hex := mercuryutils.BytesToFeedID(req1.FeedId).String()

t.Run("errors with nil req", func(t *testing.T) {
c := newMemCache(client, cfg)
c := newMemCache(lggr, client, cfg)

_, err := c.LatestReport(ctx, nil)
assert.EqualError(t, err, "req must not be nil")
})

t.Run("with LatestReportTTL=0 does no caching", func(t *testing.T) {
c := newMemCache(client, cfg)
c := newMemCache(lggr, client, cfg)

req := &pb.LatestReportRequest{}
for i := 0; i < 5; i++ {
Expand All @@ -58,7 +60,7 @@ func Test_Cache(t *testing.T) {
t.Run("caches repeated calls to LatestReport, keyed by request", func(t *testing.T) {
cfg.LatestReportTTL = neverExpireTTL
client.err = nil
c := newMemCache(client, cfg)
c := newMemCache(lggr, client, cfg)

t.Run("if cache is unstarted, returns error", func(t *testing.T) {
// starting the cache is required for state management if we
Expand Down Expand Up @@ -122,8 +124,8 @@ func Test_Cache(t *testing.T) {
})

t.Run("re-queries when a cache item has expired", func(t *testing.T) {
vi, exists := c.cache.Load(req1)
assert.True(t, exists)
vi, exists := c.cache.Load(feedID1Hex)
require.True(t, exists)
v := vi.(*cacheVal)
v.expiresAt = time.Now().Add(-1 * time.Second)

Expand Down Expand Up @@ -167,7 +169,7 @@ func Test_Cache(t *testing.T) {
})

t.Run("timeouts", func(t *testing.T) {
c := newMemCache(client, cfg)
c := newMemCache(lggr, client, cfg)
// simulate fetch already executing in background
v := &cacheVal{
fetching: true,
Expand All @@ -176,7 +178,7 @@ func Test_Cache(t *testing.T) {
err: nil,
expiresAt: time.Now().Add(-1 * time.Second),
}
c.cache.Store(req1, v)
c.cache.Store(feedID1Hex, v)

canceledCtx, cancel := context.WithCancel(testutils.Context(t))
cancel()
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/mercury/wsrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func Test_Client_LatestReport(t *testing.T) {

t.Run("with cache disabled", func(t *testing.T) {
req := &pb.LatestReportRequest{}
cacheSet := cache.NewCacheSet(cache.Config{LatestReportTTL: 0, Logger: lggr})
cacheSet := cache.NewCacheSet(lggr, cache.Config{LatestReportTTL: 0})
resp := &pb.LatestReportResponse{}

var calls int
Expand Down Expand Up @@ -178,7 +178,7 @@ func Test_Client_LatestReport(t *testing.T) {
t.Run("with caching", func(t *testing.T) {
req := &pb.LatestReportRequest{}
const neverExpireTTL = 1000 * time.Hour // some massive value that will never expire during a test
cacheSet := cache.NewCacheSet(cache.Config{LatestReportTTL: neverExpireTTL, Logger: lggr})
cacheSet := cache.NewCacheSet(lggr, cache.Config{LatestReportTTL: neverExpireTTL})
resp := &pb.LatestReportResponse{}

var calls int
Expand Down
5 changes: 3 additions & 2 deletions core/services/relay/evm/mercury/wsrpc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ type pool struct {
}

func NewPool(lggr logger.Logger, cacheCfg cache.Config) Pool {
p := newPool(lggr.Named("Mercury.WSRPCPool"))
lggr = lggr.Named("Mercury.WSRPCPool")
p := newPool(lggr)
p.newClient = NewClient
p.cacheSet = cache.NewCacheSet(cacheCfg)
p.cacheSet = cache.NewCacheSet(lggr, cacheCfg)
return p
}

Expand Down

0 comments on commit e140618

Please sign in to comment.