Skip to content

Commit

Permalink
Add Persistent Subnets (#5734)
Browse files Browse the repository at this point in the history
* add params

* add changes

* bug fixes

* fix method

* get new assignments

* add test and comments

* change to slice of uint64

* add test

* lint

* Update beacon-chain/rpc/validator/assignments_test.go

* Update beacon-chain/cache/committee_ids.go

* Update beacon-chain/rpc/validator/assignments.go

* add comment

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: terence tsao <terence@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
4 people authored May 5, 2020
1 parent 1a27c21 commit 3a677ef
Show file tree
Hide file tree
Showing 14 changed files with 172 additions and 38 deletions.
1 change: 1 addition & 0 deletions beacon-chain/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//shared/params:go_default_library",
"//shared/sliceutil:go_default_library",
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
Expand Down
57 changes: 52 additions & 5 deletions beacon-chain/cache/committee_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package cache

import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/patrickmn/go-cache"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
)

type committeeIDs struct {
attester *lru.Cache
attesterLock sync.RWMutex
aggregator *lru.Cache
aggregatorLock sync.RWMutex
attester *lru.Cache
attesterLock sync.RWMutex
aggregator *lru.Cache
aggregatorLock sync.RWMutex
persistentSubnets *cache.Cache
subnetsLock sync.RWMutex
}

// CommitteeIDs for attester and aggregator.
Expand All @@ -30,7 +34,10 @@ func newCommitteeIDs() *committeeIDs {
if err != nil {
panic(err)
}
return &committeeIDs{attester: attesterCache, aggregator: aggregatorCache}
epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot)
subLength := epochDuration * time.Duration(params.BeaconNetworkConfig().EpochsPerRandomSubnetSubscription)
persistentCache := cache.New(subLength*time.Second, epochDuration*time.Second)
return &committeeIDs{attester: attesterCache, aggregator: aggregatorCache, persistentSubnets: persistentCache}
}

// AddAttesterCommiteeID adds committee ID for subscribing subnet for the attester of a given slot.
Expand Down Expand Up @@ -85,3 +92,43 @@ func (c *committeeIDs) GetAggregatorCommitteeIDs(slot uint64) []uint64 {
}
return val.([]uint64)
}

// GetPersistentCommittees retrieves the persistent committee and expiration time of that validator's
// subscription.
func (c *committeeIDs) GetPersistentCommittees(pubkey []byte) ([]uint64, bool, time.Time) {
c.subnetsLock.RLock()
defer c.subnetsLock.RUnlock()

id, duration, ok := c.persistentSubnets.GetWithExpiration(string(pubkey))
if !ok {
return []uint64{}, ok, time.Time{}
}
return id.([]uint64), ok, duration
}

// GetAllCommittees retrieves all the non-expired subscribed committees of all the validators
// in the cache.
func (c *committeeIDs) GetAllCommittees() []uint64 {
c.subnetsLock.RLock()
defer c.subnetsLock.RUnlock()

itemsMap := c.persistentSubnets.Items()
committees := []uint64{}

for _, v := range itemsMap {
if v.Expired() {
continue
}
committees = append(committees, v.Object.([]uint64)...)
}
return sliceutil.SetUint64(committees)
}

// AddPersistentCommittee adds the relevant committee for that particular validator along with its
// expiration period.
func (c *committeeIDs) AddPersistentCommittee(pubkey []byte, comIndex []uint64, duration time.Duration) {
c.subnetsLock.Lock()
defer c.subnetsLock.Unlock()

c.persistentSubnets.Set(string(pubkey), comIndex, duration)
}
28 changes: 28 additions & 0 deletions beacon-chain/cache/committee_ids_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,31 @@ func TestCommitteeIDCache_RoundTrip(t *testing.T) {
t.Error("Expected equal value to return from cache")
}
}

func TestCommitteeIDs_PersistentCommitteeRoundtrip(t *testing.T) {
pubkeySet := [][48]byte{}
c := newCommitteeIDs()

for i := 0; i < 20; i++ {
pubkey := [48]byte{byte(i)}
pubkeySet = append(pubkeySet, pubkey)
c.AddPersistentCommittee(pubkey[:], []uint64{uint64(i)}, 0)
}

for i := 0; i < 20; i++ {
pubkey := [48]byte{byte(i)}

idxs, ok, _ := c.GetPersistentCommittees(pubkey[:])
if !ok {
t.Errorf("Couldn't find entry in cache for pubkey %#x", pubkey)
continue
}
if int(idxs[0]) != i {
t.Fatalf("Wanted index of %d but got %d", i, idxs[0])
}
}
coms := c.GetAllCommittees()
if len(coms) != 20 {
t.Errorf("Number of committees is not %d but is %d", 20, len(coms))
}
}
2 changes: 0 additions & 2 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ go_library(
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/p2p/connmgr:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
Expand All @@ -48,7 +47,6 @@ go_library(
"//shared/p2putils:go_default_library",
"//shared/params:go_default_library",
"//shared/runutil:go_default_library",
"//shared/sliceutil:go_default_library",
"//shared/traceutil:go_default_library",
"@com_github_btcsuite_btcd//btcec:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type PubSubProvider interface {
type PeerManager interface {
Disconnect(peer.ID) error
PeerID() peer.ID
RefreshENR(epoch uint64)
RefreshENR()
FindPeersWithSubnet(index uint64) (bool, error)
AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error)
}
Expand Down
15 changes: 3 additions & 12 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -280,8 +278,7 @@ func (s *Service) Start() {
runutil.RunEvery(s.ctx, time.Hour, s.Peers().Decay)
runutil.RunEvery(s.ctx, 10*time.Second, s.updateMetrics)
runutil.RunEvery(s.ctx, refreshRate, func() {
currentEpoch := helpers.SlotToEpoch(helpers.SlotsSince(s.genesisTime))
s.RefreshENR(currentEpoch)
s.RefreshENR()
})

multiAddrs := s.host.Network().ListenAddresses()
Expand Down Expand Up @@ -387,19 +384,13 @@ func (s *Service) MetadataSeq() uint64 {
// RefreshENR uses an epoch to refresh the enr entry for our node
// with the tracked committee id's for the epoch, allowing our node
// to be dynamically discoverable by others given our tracked committee id's.
func (s *Service) RefreshENR(epoch uint64) {
func (s *Service) RefreshENR() {
// return early if discv5 isnt running
if s.dv5Listener == nil {
return
}
bitV := bitfield.NewBitvector64()

var committees []uint64
epochStartSlot := helpers.StartSlot(epoch)
for i := epochStartSlot; i < epochStartSlot+2*params.BeaconConfig().SlotsPerEpoch; i++ {
committees = append(committees, sliceutil.UnionUint64(cache.CommitteeIDs.GetAttesterCommitteeIDs(i),
cache.CommitteeIDs.GetAggregatorCommitteeIDs(i))...)
}
committees := cache.CommitteeIDs.GetAllCommittees()
for _, idx := range committees {
bitV.SetBitAt(idx, true)
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/subnets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
metaData: &pb.MetaData{},
}
cache.CommitteeIDs.AddAttesterCommiteeID(0, 10)
testService.RefreshENR(0)
testService.RefreshENR()
time.Sleep(2 * time.Second)

exists, err = s.FindPeersWithSubnet(2)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (p *TestP2P) FindPeersWithSubnet(index uint64) (bool, error) {
}

// RefreshENR mocks the p2p func.
func (p *TestP2P) RefreshENR(epoch uint64) {
func (p *TestP2P) RefreshENR() {
return
}

Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/validator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/traceutil:go_default_library",
"//shared/trieutil:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
Expand Down
32 changes: 32 additions & 0 deletions beacon-chain/rpc/validator/assignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package validator

import (
"context"
"math/rand"
"time"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -74,6 +79,9 @@ func (vs *Server) GetDuties(ctx context.Context, req *ethpb.DutiesRequest) (*eth
if ok {
nextCommitteeIDs = append(nextCommitteeIDs, ca.CommitteeIndex)
}
// Assign relevant validator to subnet.
assignValidatorToSubnet(pubKey, assignment.Status)

} else {
// If the validator isn't in the beacon state, assume their status is unknown.
assignment.Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
Expand All @@ -90,3 +98,27 @@ func (vs *Server) GetDuties(ctx context.Context, req *ethpb.DutiesRequest) (*eth
func (vs *Server) StreamDuties(stream ethpb.BeaconNodeValidator_StreamDutiesServer) error {
return status.Error(codes.Unimplemented, "unimplemented")
}

// assignValidatorToSubnet checks the status and pubkey of a particular validator
// to discern whether persistent subnets need to be registered for them.
func assignValidatorToSubnet(pubkey []byte, status ethpb.ValidatorStatus) {
if status != ethpb.ValidatorStatus_ACTIVE && status != ethpb.ValidatorStatus_EXITING {
return
}

_, ok, expTime := cache.CommitteeIDs.GetPersistentCommittees(pubkey)
if ok && expTime.After(roughtime.Now()) {
return
}
epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot)
assignedIdxs := []uint64{}
for i := uint64(0); i < params.BeaconNetworkConfig().RandomSubnetsPerValidator; i++ {
assignedIndex := rand.Intn(int(params.BeaconNetworkConfig().AttestationSubnetCount))
assignedIdxs = append(assignedIdxs, uint64(assignedIndex))
}
assignedDuration := rand.Intn(int(params.BeaconNetworkConfig().EpochsPerRandomSubnetSubscription))
assignedDuration += int(params.BeaconNetworkConfig().EpochsPerRandomSubnetSubscription)

totalDuration := epochDuration * time.Duration(assignedDuration)
cache.CommitteeIDs.AddPersistentCommittee(pubkey, assignedIdxs, totalDuration*time.Second)
}
21 changes: 21 additions & 0 deletions beacon-chain/rpc/validator/assignments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/prysmaticlabs/prysm/beacon-chain/cache"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
Expand Down Expand Up @@ -274,6 +275,26 @@ func TestGetDuties_SyncNotReady(t *testing.T) {
}
}

func TestAssignValidatorToSubnet(t *testing.T) {
k := pubKey(3)

assignValidatorToSubnet(k, ethpb.ValidatorStatus_ACTIVE)
coms, ok, exp := cache.CommitteeIDs.GetPersistentCommittees(k)
if !ok {
t.Fatal("No cache entry found for validator")
}
if uint64(len(coms)) != params.BeaconNetworkConfig().RandomSubnetsPerValidator {
t.Errorf("Only %d committees subscribed when %d was needed.", len(coms), params.BeaconNetworkConfig().RandomSubnetsPerValidator)
}
epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot)
totalTime := time.Duration(params.BeaconNetworkConfig().EpochsPerRandomSubnetSubscription) * epochDuration * time.Second
receivedTime := exp.Round(time.Second).Sub(time.Now())
if receivedTime < totalTime {
t.Fatalf("Expiration time of %f was less than expected duration of %f ", receivedTime.Seconds(), totalTime.Seconds())
}

}

func BenchmarkCommitteeAssignment(b *testing.B) {
db := dbutil.SetupDB(b)

Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/p2putils"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"go.opencensus.io/trace"
Expand Down Expand Up @@ -238,8 +239,14 @@ func (r *Service) subscribeDynamicWithSubnets(
if r.chainStarted && r.initialSync.Syncing() {
continue
}

// Persistent subscriptions from validators
persistentSubs := r.persistentCommitteeIndices()
// Update desired topic indices for aggregator
wantedSubs := r.aggregatorCommitteeIndices(currentSlot)

// Combine subscriptions to get all requested subscriptions
wantedSubs = sliceutil.SetUint64(append(persistentSubs, wantedSubs...))
// Resize as appropriate.
r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (r *Service) committeesCount() int {
return int(helpers.SlotCommitteeCount(uint64(len(activeValidatorIndices))))
}

func (r *Service) persistentCommitteeIndices() []uint64 {
return cache.CommitteeIDs.GetAllCommittees()
}

func (r *Service) aggregatorCommitteeIndices(currentSlot uint64) []uint64 {
endEpoch := helpers.SlotToEpoch(currentSlot) + 1
endSlot := endEpoch * params.BeaconConfig().SlotsPerEpoch
Expand Down
36 changes: 20 additions & 16 deletions shared/params/network_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,33 @@ import "time"

// NetworkConfig defines the spec based network parameters.
type NetworkConfig struct {
GossipMaxSize uint64 `yaml:"GOSSIP_MAX_SIZE"` // GossipMaxSize is the maximum allowed size of uncompressed gossip messages.
MaxChunkSize uint64 `yaml:"MAX_CHUNK_SIZE"` // MaxChunkSize is the the maximum allowed size of uncompressed req/resp chunked responses.
AttestationSubnetCount uint64 `yaml:"ATTESTATION_SUBNET_COUNT"` // AttestationSubnetCount is the number of attestation subnets used in the gossipsub protocol.
AttestationPropagationSlotRange uint64 `yaml:"ATTESTATION_PROPAGATION_SLOT_RANGE"` // AttestationPropagationSlotRange is the maximum number of slots during which an attestation can be propagated.
TtfbTimeout time.Duration `yaml:"TTFB_TIMEOUT"` // TtfbTimeout is the maximum time to wait for first byte of request response (time-to-first-byte).
RespTimeout time.Duration `yaml:"RESP_TIMEOUT"` // RespTimeout is the maximum time for complete response transfer.
MaximumGossipClockDisparity time.Duration `yaml:"MAXIMUM_GOSSIP_CLOCK_DISPARITY"` // MaximumGossipClockDisparity is the maximum milliseconds of clock disparity assumed between honest nodes.
GossipMaxSize uint64 `yaml:"GOSSIP_MAX_SIZE"` // GossipMaxSize is the maximum allowed size of uncompressed gossip messages.
MaxChunkSize uint64 `yaml:"MAX_CHUNK_SIZE"` // MaxChunkSize is the the maximum allowed size of uncompressed req/resp chunked responses.
AttestationSubnetCount uint64 `yaml:"ATTESTATION_SUBNET_COUNT"` // AttestationSubnetCount is the number of attestation subnets used in the gossipsub protocol.
AttestationPropagationSlotRange uint64 `yaml:"ATTESTATION_PROPAGATION_SLOT_RANGE"` // AttestationPropagationSlotRange is the maximum number of slots during which an attestation can be propagated.
RandomSubnetsPerValidator uint64 `yaml:"RANDOM_SUBNETS_PER_VALIDATOR"` // RandomSubnetsPerValidator specifies the amount of subnets a validator has to be subscribed to at one time.
EpochsPerRandomSubnetSubscription uint64 `yaml:"EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION"` // EpochsPerRandomSubnetSubscription specifies the minimum duration a validator is connected to their subnet.
TtfbTimeout time.Duration `yaml:"TTFB_TIMEOUT"` // TtfbTimeout is the maximum time to wait for first byte of request response (time-to-first-byte).
RespTimeout time.Duration `yaml:"RESP_TIMEOUT"` // RespTimeout is the maximum time for complete response transfer.
MaximumGossipClockDisparity time.Duration `yaml:"MAXIMUM_GOSSIP_CLOCK_DISPARITY"` // MaximumGossipClockDisparity is the maximum milliseconds of clock disparity assumed between honest nodes.

// DiscoveryV5 Config
ETH2Key string // ETH2Key is the ENR key of the eth2 object in an enr.
AttSubnetKey string // AttSubnetKey is the ENR key of the subnet bitfield in the enr.
}

var defaultNetworkConfig = &NetworkConfig{
GossipMaxSize: 1 << 20, // 1 MiB
MaxChunkSize: 1 << 20, // 1 MiB
AttestationSubnetCount: 64,
AttestationPropagationSlotRange: 32,
TtfbTimeout: 5 * time.Second,
RespTimeout: 10 * time.Second,
MaximumGossipClockDisparity: 500 * time.Millisecond,
ETH2Key: "eth2",
AttSubnetKey: "attnets",
GossipMaxSize: 1 << 20, // 1 MiB
MaxChunkSize: 1 << 20, // 1 MiB
AttestationSubnetCount: 64,
AttestationPropagationSlotRange: 32,
RandomSubnetsPerValidator: 1 << 0,
EpochsPerRandomSubnetSubscription: 1 << 8,
TtfbTimeout: 5 * time.Second,
RespTimeout: 10 * time.Second,
MaximumGossipClockDisparity: 500 * time.Millisecond,
ETH2Key: "eth2",
AttSubnetKey: "attnets",
}

// BeaconNetworkConfig returns the current network config for
Expand Down

0 comments on commit 3a677ef

Please sign in to comment.