diff --git a/shared/bytesutil/bytes.go b/shared/bytesutil/bytes.go index 0c3e1df8816e..abdab6d970f6 100644 --- a/shared/bytesutil/bytes.go +++ b/shared/bytesutil/bytes.go @@ -133,6 +133,15 @@ func ToBytes48(x []byte) [48]byte { return y } +// ToBytes64 is a convenience method for converting a byte slice to a fix +// sized 64 byte array. This method will truncate the input if it is larger +// than 64 bytes. +func ToBytes64(x []byte) [64]byte { + var y [64]byte + copy(y[:], x) + return y +} + // ToBool is a convenience method for converting a byte to a bool. // This method will use the first bit of the 0 byte to generate the returned value. func ToBool(x byte) bool { diff --git a/validator/client/validator.go b/validator/client/validator.go index 3cc9261d2422..421a1274fed4 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -292,6 +292,7 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error { subscribeSlots := make([]uint64, 0, len(validatingKeys)) subscribeCommitteeIDs := make([]uint64, 0, len(validatingKeys)) subscribeIsAggregator := make([]bool, 0, len(validatingKeys)) + alreadySubscribed := make(map[[64]byte]bool) // Only log the full assignments output on epoch start to be less verbose. // Also log out on first launch so the user doesn't have to wait a whole epoch to see their assignments. if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived { @@ -310,17 +311,28 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error { } if duty.Status == ethpb.ValidatorStatus_ACTIVE { + attesterSlot := duty.AttesterSlot + committeeIndex := duty.CommitteeIndex + if duty.ProposerSlot > 0 { lFields["proposerSlot"] = duty.ProposerSlot } - lFields["attesterSlot"] = duty.AttesterSlot + lFields["attesterSlot"] = attesterSlot + + alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex) + if _, ok := alreadySubscribed[alreadySubscribedKey]; ok { + continue + } - aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey)) + aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, bytesutil.ToBytes48(duty.PublicKey)) if err != nil { return errors.Wrap(err, "could not check if a validator is an aggregator") } - subscribeSlots = append(subscribeSlots, duty.AttesterSlot) - subscribeCommitteeIDs = append(subscribeCommitteeIDs, duty.CommitteeIndex) + if aggregator { + alreadySubscribed[alreadySubscribedKey] = true + } + subscribeSlots = append(subscribeSlots, attesterSlot) + subscribeCommitteeIDs = append(subscribeCommitteeIDs, committeeIndex) subscribeIsAggregator = append(subscribeIsAggregator, aggregator) } @@ -338,12 +350,24 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error { if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived { for _, duty := range dutiesNextEpoch.Duties { if duty.Status == ethpb.ValidatorStatus_ACTIVE { - aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey)) + attesterSlot := duty.AttesterSlot + committeeIndex := duty.CommitteeIndex + + alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex) + if _, ok := alreadySubscribed[alreadySubscribedKey]; ok { + continue + } + + aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, bytesutil.ToBytes48(duty.PublicKey)) if err != nil { return errors.Wrap(err, "could not check if a validator is an aggregator") } - subscribeSlots = append(subscribeSlots, duty.AttesterSlot) - subscribeCommitteeIDs = append(subscribeCommitteeIDs, duty.CommitteeIndex) + if aggregator { + alreadySubscribed[alreadySubscribedKey] = true + } + + subscribeSlots = append(subscribeSlots, attesterSlot) + subscribeCommitteeIDs = append(subscribeCommitteeIDs, committeeIndex) subscribeIsAggregator = append(subscribeIsAggregator, aggregator) } } @@ -461,3 +485,9 @@ func (v *validator) domainData(ctx context.Context, epoch uint64, domain []byte) return res, nil } + +// This constructs a validator subscribed key, it's used to track +// which subnet has already been pending requested. +func validatorSubscribeKey(slot uint64, committeeID uint64) [64]byte { + return bytesutil.ToBytes64(append(bytesutil.Bytes32(slot), bytesutil.Bytes32(committeeID)...)) +}