Skip to content

Commit

Permalink
slasher retrieve and cache validator public key (#5220)
Browse files Browse the repository at this point in the history
* cache and retrieval of validator public keys

* fix comments

* fix comment

* fix variables

* gaz

* ivan feedback fixes

* goimports

* fix test

* comments on in line slice update

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
  • Loading branch information
shayzluf and terencechain authored Apr 2, 2020
1 parent f385a1d commit 0df1226
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 5 deletions.
5 changes: 5 additions & 0 deletions slasher/beaconclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ go_library(
"receivers.go",
"service.go",
"submit.go",
"validator_retrieval.go",
],
importpath = "github.com/prysmaticlabs/prysm/slasher/beaconclient",
visibility = ["//slasher:__subpackages__"],
deps = [
"//shared/event:go_default_library",
"//shared/params:go_default_library",
"//slasher/cache:go_default_library",
"//slasher/db:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//:go_default_library",
Expand All @@ -40,18 +42,21 @@ go_test(
"receivers_test.go",
"service_test.go",
"submit_test.go",
"validator_retrieval_test.go",
],
embed = [":go_default_library"],
deps = [
"//shared/event:go_default_library",
"//shared/mock:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"//slasher/cache:go_default_library",
"//slasher/db/testing:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)
14 changes: 11 additions & 3 deletions slasher/beaconclient/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ package beaconclient

import (
"context"
"errors"

middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/slasher/cache"
"github.com/prysmaticlabs/prysm/slasher/db"
"github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ocgrpc"
Expand Down Expand Up @@ -56,6 +57,7 @@ type Service struct {
proposerSlashingsFeed *event.Feed
receivedAttestationsBuffer chan *ethpb.IndexedAttestation
collectedAttestationsBuffer chan []*ethpb.IndexedAttestation
publicKeyCache *cache.PublicKeyCache
}

// Config options for the beaconclient service.
Expand All @@ -68,8 +70,13 @@ type Config struct {
}

// NewBeaconClientService instantiation.
func NewBeaconClientService(ctx context.Context, cfg *Config) *Service {
func NewBeaconClientService(ctx context.Context, cfg *Config) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
publicKeyCache, err := cache.NewPublicKeyCache(0, nil)
if err != nil {
return nil, errors.Wrap(err, "could not create new cache")
}

return &Service{
cert: cfg.BeaconCert,
ctx: ctx,
Expand All @@ -85,7 +92,8 @@ func NewBeaconClientService(ctx context.Context, cfg *Config) *Service {
proposerSlashingsFeed: cfg.ProposerSlashingsFeed,
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
}
publicKeyCache: publicKeyCache,
}, nil
}

// BlockFeed returns a feed other services in slasher can subscribe to
Expand Down
60 changes: 60 additions & 0 deletions slasher/beaconclient/validator_retrieval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package beaconclient

import (
"context"

"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"go.opencensus.io/trace"
)

// FindOrGetPublicKeys gets public keys from cache or request validators public
// keys from a beacon node via gRPC.
func (bs *Service) FindOrGetPublicKeys(
ctx context.Context,
validatorIndices []uint64,
) (map[uint64][]byte, error) {
ctx, span := trace.StartSpan(ctx, "beaconclient.FindOrGetPublicKeys")
defer span.End()

validators := make(map[uint64][]byte, len(validatorIndices))
notFound := 0
for _, validatorIdx := range validatorIndices {
pub, exists := bs.publicKeyCache.Get(validatorIdx)
if exists {
validators[validatorIdx] = pub
continue
}
// inline removal of cached elements from slice
validatorIndices[notFound] = validatorIdx
notFound++
}
// trim the slice to its new size
validatorIndices = validatorIndices[:notFound]

if len(validators) > 0 {
log.Tracef(
"Retrieved validators public keys from cache: %v",
validators,
)
}

if notFound == 0 {
return validators, nil
}
vc, err := bs.beaconClient.ListValidators(ctx, &ethpb.ListValidatorsRequest{
Indices: validatorIndices,
})
if err != nil {
return nil, errors.Wrapf(err, "could not request validators public key: %d", validatorIndices)
}
for _, v := range vc.ValidatorList {
validators[v.Index] = v.Validator.PublicKey
bs.publicKeyCache.Set(v.Index, v.Validator.PublicKey)
}
log.Tracef(
"Retrieved validators id public key map: %v",
validators,
)
return validators, nil
}
83 changes: 83 additions & 0 deletions slasher/beaconclient/validator_retrieval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package beaconclient

import (
"bytes"
"context"
"testing"

"github.com/golang/mock/gomock"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/mock"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/slasher/cache"
"github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
)

func TestService_RequestValidator(t *testing.T) {
hook := logTest.NewGlobal()
logrus.SetLevel(logrus.TraceLevel)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := mock.NewMockBeaconChainClient(ctrl)
validatorCache, err := cache.NewPublicKeyCache(0, nil)
if err != nil {
t.Fatalf("could not create new cache: %v", err)
}
bs := Service{
beaconClient: client,
publicKeyCache: validatorCache,
}
wanted := &ethpb.Validators{
ValidatorList: []*ethpb.Validators_ValidatorContainer{
{
Index: 0, Validator: &ethpb.Validator{PublicKey: []byte{1, 2, 3}},
},
{
Index: 1, Validator: &ethpb.Validator{PublicKey: []byte{2, 4, 5}},
},
},
}
wanted2 := &ethpb.Validators{
ValidatorList: []*ethpb.Validators_ValidatorContainer{
{
Index: 3, Validator: &ethpb.Validator{PublicKey: []byte{3, 4, 5}},
},
},
}
client.EXPECT().ListValidators(
gomock.Any(),
gomock.Any(),
).Return(wanted, nil)

client.EXPECT().ListValidators(
gomock.Any(),
gomock.Any(),
).Return(wanted2, nil)

// We request public key of validator id 0,1.
res, err := bs.FindOrGetPublicKeys(context.Background(), []uint64{0, 1})
if err != nil {
t.Fatal(err)
}
for i, v := range wanted.ValidatorList {
if !bytes.Equal(res[v.Index], wanted.ValidatorList[i].Validator.PublicKey) {
t.Errorf("Wanted %v, received %v", wanted, res)
}
}

testutil.AssertLogsContain(t, hook, "Retrieved validators id public key map:")
testutil.AssertLogsDoNotContain(t, hook, "Retrieved validators public keys from cache:")
// We expect public key of validator id 0 to be in cache.
res, err = bs.FindOrGetPublicKeys(context.Background(), []uint64{0, 3})
if err != nil {
t.Fatal(err)
}

for i, v := range wanted2.ValidatorList {
if !bytes.Equal(res[v.Index], wanted2.ValidatorList[i].Validator.PublicKey) {
t.Errorf("Wanted %v, received %v", wanted2, res)
}
}
testutil.AssertLogsContain(t, hook, "Retrieved validators public keys from cache: map[0:[1 2 3]]")
}
5 changes: 4 additions & 1 deletion slasher/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["span_cache.go"],
srcs = [
"span_cache.go",
"validators_cache.go",
],
importpath = "github.com/prysmaticlabs/prysm/slasher/cache",
visibility = ["//slasher:__subpackages__"],
deps = [
Expand Down
71 changes: 71 additions & 0 deletions slasher/cache/validators_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package cache

import (
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
// validatorsCacheSize defines the max number of validators public keys the cache can hold.
validatorsCacheSize = 300000
// Metrics for the validator cache.
validatorsCacheHit = promauto.NewCounter(prometheus.CounterOpts{
Name: "validators_cache_hit",
Help: "The total number of cache hits on the validators cache.",
})
validatorsCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
Name: "validators_cache_miss",
Help: "The total number of cache misses on the validators cache.",
})
)

// PublicKeyCache is used to store the public keys needed for signature verification.
type PublicKeyCache struct {
cache *lru.Cache
}

// NewPublicKeyCache initializes the cache.
func NewPublicKeyCache(size int, onEvicted func(key interface{}, value interface{})) (*PublicKeyCache, error) {
if size != 0 {
validatorsCacheSize = size
}
cache, err := lru.NewWithEvict(validatorsCacheSize, onEvicted)
if err != nil {
return nil, err
}
return &PublicKeyCache{cache: cache}, nil
}

// Get returns an ok bool and the cached value for the requested validator id key, if any.
func (c *PublicKeyCache) Get(validatorIdx uint64) ([]byte, bool) {
item, exists := c.cache.Get(validatorIdx)
if exists && item != nil {
validatorsCacheHit.Inc()
return item.([]byte), true
}

validatorsCacheMiss.Inc()
return nil, false
}

// Set the response in the cache.
func (c *PublicKeyCache) Set(validatorIdx uint64, publicKey []byte) {
_ = c.cache.Add(validatorIdx, publicKey)
}

// Delete removes a validator id from the cache and returns if it existed or not.
// Performs the onEviction function before removal.
func (c *PublicKeyCache) Delete(validatorIdx uint64) bool {
return c.cache.Remove(validatorIdx)
}

// Has returns true if the key exists in the cache.
func (c *PublicKeyCache) Has(validatorIdx uint64) bool {
return c.cache.Contains(validatorIdx)
}

// Clear removes all keys from the ValidatorCache.
func (c *PublicKeyCache) Clear() {
c.cache.Purge()
}
1 change: 1 addition & 0 deletions slasher/node/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//slasher/detection:go_default_library",
"//slasher/flags:go_default_library",
"//slasher/rpc:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@in_gopkg_urfave_cli_v2//:go_default_library",
],
Expand Down
6 changes: 5 additions & 1 deletion slasher/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"syscall"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/cmd"
"github.com/prysmaticlabs/prysm/shared/debug"
Expand Down Expand Up @@ -178,13 +179,16 @@ func (s *SlasherNode) registerBeaconClientService(ctx *cli.Context) error {
beaconProvider = flags.BeaconRPCProviderFlag.Value
}

bs := beaconclient.NewBeaconClientService(context.Background(), &beaconclient.Config{
bs, err := beaconclient.NewBeaconClientService(context.Background(), &beaconclient.Config{
BeaconCert: beaconCert,
SlasherDB: s.db,
BeaconProvider: beaconProvider,
AttesterSlashingsFeed: s.attesterSlashingsFeed,
ProposerSlashingsFeed: s.proposerSlashingsFeed,
})
if err != nil {
return errors.Wrap(err, "failed to initialize beacon client")
}
return s.services.RegisterService(bs)
}

Expand Down

0 comments on commit 0df1226

Please sign in to comment.