Skip to content

Commit

Permalink
Validator smarter subscribe (#5334)
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain authored Apr 7, 2020
1 parent c853805 commit b422889
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 7 deletions.
9 changes: 9 additions & 0 deletions shared/bytesutil/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ func ToBytes48(x []byte) [48]byte {
return y
}

// ToBytes64 is a convenience method for converting a byte slice to a fix
// sized 64 byte array. This method will truncate the input if it is larger
// than 64 bytes.
func ToBytes64(x []byte) [64]byte {
var y [64]byte
copy(y[:], x)
return y
}

// ToBool is a convenience method for converting a byte to a bool.
// This method will use the first bit of the 0 byte to generate the returned value.
func ToBool(x byte) bool {
Expand Down
44 changes: 37 additions & 7 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
subscribeSlots := make([]uint64, 0, len(validatingKeys))
subscribeCommitteeIDs := make([]uint64, 0, len(validatingKeys))
subscribeIsAggregator := make([]bool, 0, len(validatingKeys))
alreadySubscribed := make(map[[64]byte]bool)
// Only log the full assignments output on epoch start to be less verbose.
// Also log out on first launch so the user doesn't have to wait a whole epoch to see their assignments.
if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived {
Expand All @@ -310,17 +311,28 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
}

if duty.Status == ethpb.ValidatorStatus_ACTIVE {
attesterSlot := duty.AttesterSlot
committeeIndex := duty.CommitteeIndex

if duty.ProposerSlot > 0 {
lFields["proposerSlot"] = duty.ProposerSlot
}
lFields["attesterSlot"] = duty.AttesterSlot
lFields["attesterSlot"] = attesterSlot

alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex)
if _, ok := alreadySubscribed[alreadySubscribedKey]; ok {
continue
}

aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey))
aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, bytesutil.ToBytes48(duty.PublicKey))
if err != nil {
return errors.Wrap(err, "could not check if a validator is an aggregator")
}
subscribeSlots = append(subscribeSlots, duty.AttesterSlot)
subscribeCommitteeIDs = append(subscribeCommitteeIDs, duty.CommitteeIndex)
if aggregator {
alreadySubscribed[alreadySubscribedKey] = true
}
subscribeSlots = append(subscribeSlots, attesterSlot)
subscribeCommitteeIDs = append(subscribeCommitteeIDs, committeeIndex)
subscribeIsAggregator = append(subscribeIsAggregator, aggregator)
}

Expand All @@ -338,12 +350,24 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived {
for _, duty := range dutiesNextEpoch.Duties {
if duty.Status == ethpb.ValidatorStatus_ACTIVE {
aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey))
attesterSlot := duty.AttesterSlot
committeeIndex := duty.CommitteeIndex

alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex)
if _, ok := alreadySubscribed[alreadySubscribedKey]; ok {
continue
}

aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, bytesutil.ToBytes48(duty.PublicKey))
if err != nil {
return errors.Wrap(err, "could not check if a validator is an aggregator")
}
subscribeSlots = append(subscribeSlots, duty.AttesterSlot)
subscribeCommitteeIDs = append(subscribeCommitteeIDs, duty.CommitteeIndex)
if aggregator {
alreadySubscribed[alreadySubscribedKey] = true
}

subscribeSlots = append(subscribeSlots, attesterSlot)
subscribeCommitteeIDs = append(subscribeCommitteeIDs, committeeIndex)
subscribeIsAggregator = append(subscribeIsAggregator, aggregator)
}
}
Expand Down Expand Up @@ -461,3 +485,9 @@ func (v *validator) domainData(ctx context.Context, epoch uint64, domain []byte)

return res, nil
}

// This constructs a validator subscribed key, it's used to track
// which subnet has already been pending requested.
func validatorSubscribeKey(slot uint64, committeeID uint64) [64]byte {
return bytesutil.ToBytes64(append(bytesutil.Bytes32(slot), bytesutil.Bytes32(committeeID)...))
}

0 comments on commit b422889

Please sign in to comment.