diff --git a/beacon-chain/cache/BUILD.bazel b/beacon-chain/cache/BUILD.bazel index 2e9693e212ed..9e3c810a7c73 100644 --- a/beacon-chain/cache/BUILD.bazel +++ b/beacon-chain/cache/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//shared/params:go_default_library", "//shared/sliceutil:go_default_library", "@com_github_hashicorp_golang_lru//:go_default_library", + "@com_github_patrickmn_go_cache//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", diff --git a/beacon-chain/cache/committee_ids.go b/beacon-chain/cache/committee_ids.go index d61d863fa38e..fd9fbf37a4a6 100644 --- a/beacon-chain/cache/committee_ids.go +++ b/beacon-chain/cache/committee_ids.go @@ -2,17 +2,21 @@ package cache import ( "sync" + "time" lru "github.com/hashicorp/golang-lru" + "github.com/patrickmn/go-cache" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/sliceutil" ) type committeeIDs struct { - attester *lru.Cache - attesterLock sync.RWMutex - aggregator *lru.Cache - aggregatorLock sync.RWMutex + attester *lru.Cache + attesterLock sync.RWMutex + aggregator *lru.Cache + aggregatorLock sync.RWMutex + persistentSubnets *cache.Cache + subnetsLock sync.RWMutex } // CommitteeIDs for attester and aggregator. @@ -30,7 +34,10 @@ func newCommitteeIDs() *committeeIDs { if err != nil { panic(err) } - return &committeeIDs{attester: attesterCache, aggregator: aggregatorCache} + epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot) + subLength := epochDuration * time.Duration(params.BeaconNetworkConfig().EpochsPerRandomSubnetSubscription) + persistentCache := cache.New(subLength*time.Second, epochDuration*time.Second) + return &committeeIDs{attester: attesterCache, aggregator: aggregatorCache, persistentSubnets: persistentCache} } // AddAttesterCommiteeID adds committee ID for subscribing subnet for the attester of a given slot. @@ -85,3 +92,43 @@ func (c *committeeIDs) GetAggregatorCommitteeIDs(slot uint64) []uint64 { } return val.([]uint64) } + +// GetPersistentCommittees retrieves the persistent committee and expiration time of that validator's +// subscription. +func (c *committeeIDs) GetPersistentCommittees(pubkey []byte) ([]uint64, bool, time.Time) { + c.subnetsLock.RLock() + defer c.subnetsLock.RUnlock() + + id, duration, ok := c.persistentSubnets.GetWithExpiration(string(pubkey)) + if !ok { + return []uint64{}, ok, time.Time{} + } + return id.([]uint64), ok, duration +} + +// GetAllCommittees retrieves all the non-expired subscribed committees of all the validators +// in the cache. +func (c *committeeIDs) GetAllCommittees() []uint64 { + c.subnetsLock.RLock() + defer c.subnetsLock.RUnlock() + + itemsMap := c.persistentSubnets.Items() + committees := []uint64{} + + for _, v := range itemsMap { + if v.Expired() { + continue + } + committees = append(committees, v.Object.([]uint64)...) + } + return sliceutil.SetUint64(committees) +} + +// AddPersistentCommittee adds the relevant committee for that particular validator along with its +// expiration period. +func (c *committeeIDs) AddPersistentCommittee(pubkey []byte, comIndex []uint64, duration time.Duration) { + c.subnetsLock.Lock() + defer c.subnetsLock.Unlock() + + c.persistentSubnets.Set(string(pubkey), comIndex, duration) +} diff --git a/beacon-chain/cache/committee_ids_test.go b/beacon-chain/cache/committee_ids_test.go index 3ae97533a825..7b150fbc139f 100644 --- a/beacon-chain/cache/committee_ids_test.go +++ b/beacon-chain/cache/committee_ids_test.go @@ -54,3 +54,31 @@ func TestCommitteeIDCache_RoundTrip(t *testing.T) { t.Error("Expected equal value to return from cache") } } + +func TestCommitteeIDs_PersistentCommitteeRoundtrip(t *testing.T) { + pubkeySet := [][48]byte{} + c := newCommitteeIDs() + + for i := 0; i < 20; i++ { + pubkey := [48]byte{byte(i)} + pubkeySet = append(pubkeySet, pubkey) + c.AddPersistentCommittee(pubkey[:], []uint64{uint64(i)}, 0) + } + + for i := 0; i < 20; i++ { + pubkey := [48]byte{byte(i)} + + idxs, ok, _ := c.GetPersistentCommittees(pubkey[:]) + if !ok { + t.Errorf("Couldn't find entry in cache for pubkey %#x", pubkey) + continue + } + if int(idxs[0]) != i { + t.Fatalf("Wanted index of %d but got %d", i, idxs[0]) + } + } + coms := c.GetAllCommittees() + if len(coms) != 20 { + t.Errorf("Number of committees is not %d but is %d", 20, len(coms)) + } +} diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index d6ffc5cab8df..6c14d1429cb2 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -36,7 +36,6 @@ go_library( "//beacon-chain/cache:go_default_library", "//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed/state:go_default_library", - "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/p2p/connmgr:go_default_library", "//beacon-chain/p2p/encoder:go_default_library", "//beacon-chain/p2p/peers:go_default_library", @@ -48,7 +47,6 @@ go_library( "//shared/p2putils:go_default_library", "//shared/params:go_default_library", "//shared/runutil:go_default_library", - "//shared/sliceutil:go_default_library", "//shared/traceutil:go_default_library", "@com_github_btcsuite_btcd//btcec:go_default_library", "@com_github_dgraph_io_ristretto//:go_default_library", diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 155469d9c9f0..181c33c589e5 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -55,7 +55,7 @@ type PubSubProvider interface { type PeerManager interface { Disconnect(peer.ID) error PeerID() peer.ID - RefreshENR(epoch uint64) + RefreshENR() FindPeersWithSubnet(index uint64) (bool, error) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index ed82ade4b738..6f21233fd974 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -33,14 +33,12 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" - "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/runutil" - "github.com/prysmaticlabs/prysm/shared/sliceutil" "github.com/sirupsen/logrus" ) @@ -280,8 +278,7 @@ func (s *Service) Start() { runutil.RunEvery(s.ctx, time.Hour, s.Peers().Decay) runutil.RunEvery(s.ctx, 10*time.Second, s.updateMetrics) runutil.RunEvery(s.ctx, refreshRate, func() { - currentEpoch := helpers.SlotToEpoch(helpers.SlotsSince(s.genesisTime)) - s.RefreshENR(currentEpoch) + s.RefreshENR() }) multiAddrs := s.host.Network().ListenAddresses() @@ -387,19 +384,13 @@ func (s *Service) MetadataSeq() uint64 { // RefreshENR uses an epoch to refresh the enr entry for our node // with the tracked committee id's for the epoch, allowing our node // to be dynamically discoverable by others given our tracked committee id's. -func (s *Service) RefreshENR(epoch uint64) { +func (s *Service) RefreshENR() { // return early if discv5 isnt running if s.dv5Listener == nil { return } bitV := bitfield.NewBitvector64() - - var committees []uint64 - epochStartSlot := helpers.StartSlot(epoch) - for i := epochStartSlot; i < epochStartSlot+2*params.BeaconConfig().SlotsPerEpoch; i++ { - committees = append(committees, sliceutil.UnionUint64(cache.CommitteeIDs.GetAttesterCommitteeIDs(i), - cache.CommitteeIDs.GetAggregatorCommitteeIDs(i))...) - } + committees := cache.CommitteeIDs.GetAllCommittees() for _, idx := range committees { bitV.SetBitAt(idx, true) } diff --git a/beacon-chain/p2p/subnets_test.go b/beacon-chain/p2p/subnets_test.go index a7e3a3102f18..dfb6bb3585d6 100644 --- a/beacon-chain/p2p/subnets_test.go +++ b/beacon-chain/p2p/subnets_test.go @@ -125,7 +125,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) { metaData: &pb.MetaData{}, } cache.CommitteeIDs.AddAttesterCommiteeID(0, 10) - testService.RefreshENR(0) + testService.RefreshENR() time.Sleep(2 * time.Second) exists, err = s.FindPeersWithSubnet(2) diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index d568116c326a..7d9699854a24 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -255,7 +255,7 @@ func (p *TestP2P) FindPeersWithSubnet(index uint64) (bool, error) { } // RefreshENR mocks the p2p func. -func (p *TestP2P) RefreshENR(epoch uint64) { +func (p *TestP2P) RefreshENR() { return } diff --git a/beacon-chain/rpc/validator/BUILD.bazel b/beacon-chain/rpc/validator/BUILD.bazel index d3eef503d071..86c1f24af9d9 100644 --- a/beacon-chain/rpc/validator/BUILD.bazel +++ b/beacon-chain/rpc/validator/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "//shared/featureconfig:go_default_library", "//shared/hashutil:go_default_library", "//shared/params:go_default_library", + "//shared/roughtime:go_default_library", "//shared/traceutil:go_default_library", "//shared/trieutil:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", diff --git a/beacon-chain/rpc/validator/assignments.go b/beacon-chain/rpc/validator/assignments.go index 19c372a166e2..39ca32ca672a 100644 --- a/beacon-chain/rpc/validator/assignments.go +++ b/beacon-chain/rpc/validator/assignments.go @@ -2,11 +2,16 @@ package validator import ( "context" + "math/rand" + "time" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/roughtime" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -74,6 +79,9 @@ func (vs *Server) GetDuties(ctx context.Context, req *ethpb.DutiesRequest) (*eth if ok { nextCommitteeIDs = append(nextCommitteeIDs, ca.CommitteeIndex) } + // Assign relevant validator to subnet. + assignValidatorToSubnet(pubKey, assignment.Status) + } else { // If the validator isn't in the beacon state, assume their status is unknown. assignment.Status = ethpb.ValidatorStatus_UNKNOWN_STATUS @@ -90,3 +98,27 @@ func (vs *Server) GetDuties(ctx context.Context, req *ethpb.DutiesRequest) (*eth func (vs *Server) StreamDuties(stream ethpb.BeaconNodeValidator_StreamDutiesServer) error { return status.Error(codes.Unimplemented, "unimplemented") } + +// assignValidatorToSubnet checks the status and pubkey of a particular validator +// to discern whether persistent subnets need to be registered for them. +func assignValidatorToSubnet(pubkey []byte, status ethpb.ValidatorStatus) { + if status != ethpb.ValidatorStatus_ACTIVE && status != ethpb.ValidatorStatus_EXITING { + return + } + + _, ok, expTime := cache.CommitteeIDs.GetPersistentCommittees(pubkey) + if ok && expTime.After(roughtime.Now()) { + return + } + epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot) + assignedIdxs := []uint64{} + for i := uint64(0); i < params.BeaconNetworkConfig().RandomSubnetsPerValidator; i++ { + assignedIndex := rand.Intn(int(params.BeaconNetworkConfig().AttestationSubnetCount)) + assignedIdxs = append(assignedIdxs, uint64(assignedIndex)) + } + assignedDuration := rand.Intn(int(params.BeaconNetworkConfig().EpochsPerRandomSubnetSubscription)) + assignedDuration += int(params.BeaconNetworkConfig().EpochsPerRandomSubnetSubscription) + + totalDuration := epochDuration * time.Duration(assignedDuration) + cache.CommitteeIDs.AddPersistentCommittee(pubkey, assignedIdxs, totalDuration*time.Second) +} diff --git a/beacon-chain/rpc/validator/assignments_test.go b/beacon-chain/rpc/validator/assignments_test.go index d39409b85f75..e82fbae3f8dc 100644 --- a/beacon-chain/rpc/validator/assignments_test.go +++ b/beacon-chain/rpc/validator/assignments_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" @@ -274,6 +275,26 @@ func TestGetDuties_SyncNotReady(t *testing.T) { } } +func TestAssignValidatorToSubnet(t *testing.T) { + k := pubKey(3) + + assignValidatorToSubnet(k, ethpb.ValidatorStatus_ACTIVE) + coms, ok, exp := cache.CommitteeIDs.GetPersistentCommittees(k) + if !ok { + t.Fatal("No cache entry found for validator") + } + if uint64(len(coms)) != params.BeaconNetworkConfig().RandomSubnetsPerValidator { + t.Errorf("Only %d committees subscribed when %d was needed.", len(coms), params.BeaconNetworkConfig().RandomSubnetsPerValidator) + } + epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot) + totalTime := time.Duration(params.BeaconNetworkConfig().EpochsPerRandomSubnetSubscription) * epochDuration * time.Second + receivedTime := exp.Round(time.Second).Sub(time.Now()) + if receivedTime < totalTime { + t.Fatalf("Expiration time of %f was less than expected duration of %f ", receivedTime.Seconds(), totalTime.Seconds()) + } + +} + func BenchmarkCommitteeAssignment(b *testing.B) { db := dbutil.SetupDB(b) diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 3bd2f2a1d045..1db859529be1 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -20,6 +20,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/p2putils" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/roughtime" + "github.com/prysmaticlabs/prysm/shared/sliceutil" "github.com/prysmaticlabs/prysm/shared/slotutil" "github.com/prysmaticlabs/prysm/shared/traceutil" "go.opencensus.io/trace" @@ -238,8 +239,14 @@ func (r *Service) subscribeDynamicWithSubnets( if r.chainStarted && r.initialSync.Syncing() { continue } + + // Persistent subscriptions from validators + persistentSubs := r.persistentCommitteeIndices() // Update desired topic indices for aggregator wantedSubs := r.aggregatorCommitteeIndices(currentSlot) + + // Combine subscriptions to get all requested subscriptions + wantedSubs = sliceutil.SetUint64(append(persistentSubs, wantedSubs...)) // Resize as appropriate. r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) diff --git a/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go b/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go index fe1a4580ee31..a33f0edd96f7 100644 --- a/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go +++ b/beacon-chain/sync/subscriber_committee_index_beacon_attestation.go @@ -54,6 +54,10 @@ func (r *Service) committeesCount() int { return int(helpers.SlotCommitteeCount(uint64(len(activeValidatorIndices)))) } +func (r *Service) persistentCommitteeIndices() []uint64 { + return cache.CommitteeIDs.GetAllCommittees() +} + func (r *Service) aggregatorCommitteeIndices(currentSlot uint64) []uint64 { endEpoch := helpers.SlotToEpoch(currentSlot) + 1 endSlot := endEpoch * params.BeaconConfig().SlotsPerEpoch diff --git a/shared/params/network_config.go b/shared/params/network_config.go index f471901531ff..30055baacab4 100644 --- a/shared/params/network_config.go +++ b/shared/params/network_config.go @@ -4,13 +4,15 @@ import "time" // NetworkConfig defines the spec based network parameters. type NetworkConfig struct { - GossipMaxSize uint64 `yaml:"GOSSIP_MAX_SIZE"` // GossipMaxSize is the maximum allowed size of uncompressed gossip messages. - MaxChunkSize uint64 `yaml:"MAX_CHUNK_SIZE"` // MaxChunkSize is the the maximum allowed size of uncompressed req/resp chunked responses. - AttestationSubnetCount uint64 `yaml:"ATTESTATION_SUBNET_COUNT"` // AttestationSubnetCount is the number of attestation subnets used in the gossipsub protocol. - AttestationPropagationSlotRange uint64 `yaml:"ATTESTATION_PROPAGATION_SLOT_RANGE"` // AttestationPropagationSlotRange is the maximum number of slots during which an attestation can be propagated. - TtfbTimeout time.Duration `yaml:"TTFB_TIMEOUT"` // TtfbTimeout is the maximum time to wait for first byte of request response (time-to-first-byte). - RespTimeout time.Duration `yaml:"RESP_TIMEOUT"` // RespTimeout is the maximum time for complete response transfer. - MaximumGossipClockDisparity time.Duration `yaml:"MAXIMUM_GOSSIP_CLOCK_DISPARITY"` // MaximumGossipClockDisparity is the maximum milliseconds of clock disparity assumed between honest nodes. + GossipMaxSize uint64 `yaml:"GOSSIP_MAX_SIZE"` // GossipMaxSize is the maximum allowed size of uncompressed gossip messages. + MaxChunkSize uint64 `yaml:"MAX_CHUNK_SIZE"` // MaxChunkSize is the the maximum allowed size of uncompressed req/resp chunked responses. + AttestationSubnetCount uint64 `yaml:"ATTESTATION_SUBNET_COUNT"` // AttestationSubnetCount is the number of attestation subnets used in the gossipsub protocol. + AttestationPropagationSlotRange uint64 `yaml:"ATTESTATION_PROPAGATION_SLOT_RANGE"` // AttestationPropagationSlotRange is the maximum number of slots during which an attestation can be propagated. + RandomSubnetsPerValidator uint64 `yaml:"RANDOM_SUBNETS_PER_VALIDATOR"` // RandomSubnetsPerValidator specifies the amount of subnets a validator has to be subscribed to at one time. + EpochsPerRandomSubnetSubscription uint64 `yaml:"EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION"` // EpochsPerRandomSubnetSubscription specifies the minimum duration a validator is connected to their subnet. + TtfbTimeout time.Duration `yaml:"TTFB_TIMEOUT"` // TtfbTimeout is the maximum time to wait for first byte of request response (time-to-first-byte). + RespTimeout time.Duration `yaml:"RESP_TIMEOUT"` // RespTimeout is the maximum time for complete response transfer. + MaximumGossipClockDisparity time.Duration `yaml:"MAXIMUM_GOSSIP_CLOCK_DISPARITY"` // MaximumGossipClockDisparity is the maximum milliseconds of clock disparity assumed between honest nodes. // DiscoveryV5 Config ETH2Key string // ETH2Key is the ENR key of the eth2 object in an enr. @@ -18,15 +20,17 @@ type NetworkConfig struct { } var defaultNetworkConfig = &NetworkConfig{ - GossipMaxSize: 1 << 20, // 1 MiB - MaxChunkSize: 1 << 20, // 1 MiB - AttestationSubnetCount: 64, - AttestationPropagationSlotRange: 32, - TtfbTimeout: 5 * time.Second, - RespTimeout: 10 * time.Second, - MaximumGossipClockDisparity: 500 * time.Millisecond, - ETH2Key: "eth2", - AttSubnetKey: "attnets", + GossipMaxSize: 1 << 20, // 1 MiB + MaxChunkSize: 1 << 20, // 1 MiB + AttestationSubnetCount: 64, + AttestationPropagationSlotRange: 32, + RandomSubnetsPerValidator: 1 << 0, + EpochsPerRandomSubnetSubscription: 1 << 8, + TtfbTimeout: 5 * time.Second, + RespTimeout: 10 * time.Second, + MaximumGossipClockDisparity: 500 * time.Millisecond, + ETH2Key: "eth2", + AttSubnetKey: "attnets", } // BeaconNetworkConfig returns the current network config for