Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: tweaks to make staking more stable. #10097

Merged
merged 11 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions cl/beacon/handler/duties_attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,24 @@ type attesterDutyResponse struct {
Slot uint64 `json:"slot,string"`
}

func (a *ApiHandler) getDependentRoot(s *state.CachingBeaconState, epoch uint64) libcommon.Hash {
dependentRootSlot := ((epoch - 1) * a.beaconChainCfg.SlotsPerEpoch) - 3
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
maxIterations := 2048
for i := 0; i < maxIterations; i++ {
if dependentRootSlot > epoch*a.beaconChainCfg.SlotsPerEpoch {
return libcommon.Hash{}
}

dependentRoot, err := s.GetBlockRootAtSlot(dependentRootSlot)
if err != nil {
dependentRootSlot--
continue
}
return dependentRoot
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
}
return libcommon.Hash{}
}

func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) (*beaconhttp.BeaconResponse, error) {
epoch, err := beaconhttp.EpochFromRequest(r)
if err != nil {
Expand All @@ -31,15 +49,7 @@ func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) (
if s == nil {
return nil, beaconhttp.NewEndpointError(http.StatusServiceUnavailable, fmt.Errorf("node is syncing"))
}
dependentRootSlot := ((epoch - 1) * a.beaconChainCfg.SlotsPerEpoch) - 1
if dependentRootSlot > epoch*a.beaconChainCfg.SlotsPerEpoch {
dependentRootSlot = 0
}

dependentRoot, err := s.GetBlockRootAtSlot(dependentRootSlot)
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, fmt.Errorf("could not get dependent root: %w", err))
}
dependentRoot := a.getDependentRoot(s, epoch)

var idxsStr []string
if err := json.NewDecoder(r.Body).Decode(&idxsStr); err != nil {
Expand Down Expand Up @@ -78,7 +88,7 @@ func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) (
return nil, beaconhttp.NewEndpointError(http.StatusServiceUnavailable, fmt.Errorf("node is syncing"))
}

if epoch > state.Epoch(s)+1 {
if epoch > state.Epoch(s)+3 {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, fmt.Errorf("epoch %d is too far in the future", epoch))
}

Expand Down
6 changes: 1 addition & 5 deletions cl/beacon/handler/duties_proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ func (a *ApiHandler) getDutiesProposer(w http.ResponseWriter, r *http.Request) (
if s == nil {
return nil, beaconhttp.NewEndpointError(http.StatusServiceUnavailable, fmt.Errorf("node is syncing"))
}
dependentRoot, err := s.GetBlockRootAtSlot((epoch * a.beaconChainCfg.SlotsPerEpoch) - 1)
if err != nil {
return nil, err
}

dependentRoot := a.getDependentRoot(s, epoch)
if epoch < a.forkchoiceStore.FinalizedCheckpoint().Epoch() {
tx, err := a.indiciesDB.BeginRo(r.Context())
if err != nil {
Expand Down
55 changes: 53 additions & 2 deletions cl/phase1/network/services/attestation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/Giulio2002/bls"
Expand Down Expand Up @@ -35,10 +36,12 @@ type attestationService struct {
beaconCfg *clparams.BeaconChainConfig
netCfg *clparams.NetworkConfig
// validatorAttestationSeen maps from epoch to validator index. This is used to ignore duplicate validator attestations in the same epoch.
validatorAttestationSeen *lru.CacheWithTTL[uint64, uint64] // validator index -> epoch
validatorAttestationSeen *lru.CacheWithTTL[uint64, uint64] // validator index -> epoch
attestationsToBeLaterProcessed sync.Map
}

func NewAttestationService(
ctx context.Context,
forkchoiceStore forkchoice.ForkChoiceStorage,
committeeSubscribe committee_subscription.CommitteeSubscribe,
ethClock eth_clock.EthereumClock,
Expand All @@ -47,7 +50,7 @@ func NewAttestationService(
netCfg *clparams.NetworkConfig,
) AttestationService {
epochDuration := time.Duration(beaconCfg.SlotsPerEpoch*beaconCfg.SecondsPerSlot) * time.Second
return &attestationService{
a := &attestationService{
forkchoiceStore: forkchoiceStore,
committeeSubscribe: committeeSubscribe,
ethClock: ethClock,
Expand All @@ -56,6 +59,8 @@ func NewAttestationService(
netCfg: netCfg,
validatorAttestationSeen: lru.NewWithTTL[uint64, uint64]("validator_attestation_seen", validatorAttestationCacheSize, epochDuration),
}
go a.loop(ctx)
return a
}

func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, att *solid.Attestation) error {
Expand Down Expand Up @@ -161,6 +166,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
// [IGNORE] The block being voted for (attestation.data.beacon_block_root) has been seen (via both gossip and non-gossip sources)
// (a client MAY queue attestations for processing once block is retrieved).
if _, ok := s.forkchoiceStore.GetHeader(root); !ok {
s.scheduleAttestationForLaterProcessing(att)
return ErrIgnore
}

Expand All @@ -183,3 +189,48 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
}
return err
}

type attestationJob struct {
att *solid.Attestation
creationTime time.Time
subnet uint64
}

func (a *attestationService) scheduleAttestationForLaterProcessing(att *solid.Attestation) {
key, err := att.HashSSZ()
if err != nil {
return
}
a.attestationsToBeLaterProcessed.Store(key, &attestationJob{
att: att,
creationTime: time.Now(),
})
}

func (a *attestationService) loop(ctx context.Context) {
ticker := time.NewTicker(singleAttestationIntervalTick)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
a.attestationsToBeLaterProcessed.Range(func(key, value any) bool {
k := key.([32]byte)
v := value.(*attestationJob)
if time.Now().After(v.creationTime.Add(singleAttestationJobExpiry)) {
a.attestationsToBeLaterProcessed.Delete(k)
return true
}

root := v.att.AttestantionData().BeaconBlockRoot()
if _, ok := a.forkchoiceStore.GetHeader(root); !ok {
return true
}
a.ProcessMessage(ctx, &v.subnet, v.att)
return true
})
}
}
4 changes: 3 additions & 1 deletion cl/phase1/network/services/attestation_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ func (t *attestationTestSuite) SetupTest() {
netConfig := &clparams.NetworkConfig{}
computeSigningRoot = func(obj ssz.HashableSSZ, domain []byte) ([32]byte, error) { return [32]byte{}, nil }
blsVerify = func(sig []byte, msg []byte, pubKeys []byte) (bool, error) { return true, nil }
t.attService = NewAttestationService(t.mockForkChoice, t.committeeSubscibe, t.ethClock, t.syncedData, t.beaconConfig, netConfig)
ctx, cn := context.WithCancel(context.Background())
cn()
t.attService = NewAttestationService(ctx, t.mockForkChoice, t.committeeSubscibe, t.ethClock, t.syncedData, t.beaconConfig, netConfig)
}

func (t *attestationTestSuite) TearDownTest() {
Expand Down
2 changes: 2 additions & 0 deletions cl/phase1/network/services/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ const (
seenBlockCacheSize = 1000 // SeenBlockCacheSize is the size of the cache for seen blocks.
blockJobsIntervalTick = 50 * time.Millisecond
blobJobsIntervalTick = 5 * time.Millisecond
singleAttestationIntervalTick = 10 * time.Millisecond
attestationJobsIntervalTick = 100 * time.Millisecond
blockJobExpiry = 7 * time.Minute
blobJobExpiry = 7 * time.Minute
attestationJobExpiry = 30 * time.Minute
singleAttestationJobExpiry = 6 * time.Second
)

var (
Expand Down
46 changes: 16 additions & 30 deletions cl/validator/committee_subscription/committee_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type CommitteeSubscribeMgmt struct {
// subscriptions
aggregationPool aggregation.AggregationPool
validatorSubsMutex sync.RWMutex
validatorSubs map[uint64]map[uint64]*validatorSub // slot -> committeeIndex -> validatorSub
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
validatorSubs map[uint64]*validatorSub // slot -> committeeIndex -> validatorSub
}

func NewCommitteeSubscribeManagement(
Expand All @@ -63,23 +63,21 @@ func NewCommitteeSubscribeManagement(
state: state,
aggregationPool: aggregationPool,
syncedData: syncedData,
validatorSubs: make(map[uint64]map[uint64]*validatorSub),
validatorSubs: make(map[uint64]*validatorSub),
}
go c.sweepByStaleSlots(ctx)
return c
}

type validatorSub struct {
subnetId uint64
aggregate bool
validatorIdxs map[uint64]struct{}
subnetId uint64
aggregate bool
}

func (c *CommitteeSubscribeMgmt) AddAttestationSubscription(ctx context.Context, p *cltypes.BeaconCommitteeSubscription) error {
var (
slot = p.Slot
cIndex = p.CommitteeIndex
vIndex = p.ValidatorIndex
)
headState := c.syncedData.HeadState()
if headState == nil {
Expand All @@ -90,29 +88,22 @@ func (c *CommitteeSubscribeMgmt) AddAttestationSubscription(ctx context.Context,
subnetId := subnets.ComputeSubnetForAttestation(commiteePerSlot, slot, cIndex, c.beaconConfig.SlotsPerEpoch, c.netConfig.AttestationSubnetCount)
// add validator to subscription
c.validatorSubsMutex.Lock()
if _, ok := c.validatorSubs[slot]; !ok {
c.validatorSubs[slot] = make(map[uint64]*validatorSub)
}
if _, ok := c.validatorSubs[slot][cIndex]; !ok {
c.validatorSubs[slot][cIndex] = &validatorSub{

if _, ok := c.validatorSubs[cIndex]; !ok {
c.validatorSubs[cIndex] = &validatorSub{
subnetId: subnetId,
aggregate: p.IsAggregator,
validatorIdxs: map[uint64]struct{}{
vIndex: {},
},
}
} else {
if p.IsAggregator {
c.validatorSubs[slot][cIndex].aggregate = true
}
c.validatorSubs[slot][cIndex].validatorIdxs[vIndex] = struct{}{}
} else if p.IsAggregator {
c.validatorSubs[cIndex].aggregate = true
}

c.validatorSubsMutex.Unlock()

// set sentinel gossip expiration by subnet id
request := sentinel.RequestSubscribeExpiry{
Topic: gossip.TopicNameBeaconAttestation(subnetId),
ExpiryUnixSecs: uint64(time.Now().Add(24 * time.Hour).Unix()), // temporarily set to 24 hours
ExpiryUnixSecs: uint64(time.Now().Add(30 * time.Minute).Unix()), // temporarily set to 30 minutes
}
if _, err := c.sentinel.SetSubscribeExpiry(ctx, &request); err != nil {
return err
Expand All @@ -121,18 +112,13 @@ func (c *CommitteeSubscribeMgmt) AddAttestationSubscription(ctx context.Context,
}

func (c *CommitteeSubscribeMgmt) CheckAggregateAttestation(att *solid.Attestation) error {
var (
slot = att.AttestantionData().Slot()
committeeIndex = att.AttestantionData().CommitteeIndex()
)
committeeIndex := att.AttestantionData().CommitteeIndex()
c.validatorSubsMutex.RLock()
defer c.validatorSubsMutex.RUnlock()
if subs, ok := c.validatorSubs[slot]; ok {
if sub, ok := subs[committeeIndex]; ok && sub.aggregate {
// aggregate attestation
if err := c.aggregationPool.AddAttestation(att); err != nil {
return err
}
if sub, ok := c.validatorSubs[committeeIndex]; ok && sub.aggregate {
// aggregate attestation
if err := c.aggregationPool.AddAttestation(att); err != nil {
return err
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/caplin/caplin1/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func RunCaplinPhase1(ctx context.Context, engine execution_client.ExecutionEngin
blockService := services.NewBlockService(ctx, indexDB, forkChoice, syncedDataManager, ethClock, beaconConfig, emitters)
blobService := services.NewBlobSidecarService(ctx, beaconConfig, forkChoice, syncedDataManager, ethClock, false)
syncCommitteeMessagesService := services.NewSyncCommitteeMessagesService(beaconConfig, ethClock, syncedDataManager, syncContributionPool, false)
attestationService := services.NewAttestationService(forkChoice, committeeSub, ethClock, syncedDataManager, beaconConfig, networkConfig)
attestationService := services.NewAttestationService(ctx, forkChoice, committeeSub, ethClock, syncedDataManager, beaconConfig, networkConfig)
syncContributionService := services.NewSyncContributionService(syncedDataManager, beaconConfig, syncContributionPool, ethClock, emitters, false)
aggregateAndProofService := services.NewAggregateAndProofService(ctx, syncedDataManager, forkChoice, beaconConfig, pool, false)
voluntaryExitService := services.NewVoluntaryExitService(pool, emitters, syncedDataManager, beaconConfig, ethClock)
Expand Down
Loading