Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More concise duty logging #5397

Merged
merged 14 commits into from
Apr 22, 2020
75 changes: 53 additions & 22 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (v *validator) checkAndLogValidatorStatus(validatorStatuses []*ethpb.Valida
"status": status.Status.Status.String(),
})
if v.emitAccountMetrics {
fmtKey := fmt.Sprintf("%#x", status.PublicKey[:])
fmtKey := fmt.Sprintf("%#x", status.PublicKey)
validatorStatusesGaugeVec.WithLabelValues(fmtKey).Set(float64(status.Status.Status))
}
switch status.Status.Status {
Expand Down Expand Up @@ -324,34 +324,17 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
}

v.duties = resp
v.logDuties(slot, v.duties.Duties)
subscribeSlots := make([]uint64, 0, len(validatingKeys))
subscribeCommitteeIDs := make([]uint64, 0, len(validatingKeys))
subscribeIsAggregator := make([]bool, 0, len(validatingKeys))
alreadySubscribed := make(map[[64]byte]bool)

for _, duty := range v.duties.Duties {
lFields := logrus.Fields{
"pubKey": fmt.Sprintf("%#x", bytesutil.Trunc(duty.PublicKey)),
"validatorIndex": duty.ValidatorIndex,
"committeeIndex": duty.CommitteeIndex,
"epoch": slot / params.BeaconConfig().SlotsPerEpoch,
"status": duty.Status,
}

if v.emitAccountMetrics {
fmtKey := fmt.Sprintf("%#x", duty.PublicKey[:])
validatorStatusesGaugeVec.WithLabelValues(fmtKey).Set(float64(duty.Status))
}

if duty.Status == ethpb.ValidatorStatus_ACTIVE {
if duty.Status == ethpb.ValidatorStatus_ACTIVE || duty.Status == ethpb.ValidatorStatus_EXITING {
attesterSlot := duty.AttesterSlot
committeeIndex := duty.CommitteeIndex

if len(duty.ProposerSlots) > 0 {
lFields["proposerSlots"] = duty.ProposerSlots
}
lFields["attesterSlot"] = attesterSlot

alreadySubscribedKey := validatorSubscribeKey(attesterSlot, committeeIndex)
if _, ok := alreadySubscribed[alreadySubscribedKey]; ok {
continue
Expand All @@ -364,10 +347,10 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
if aggregator {
alreadySubscribed[alreadySubscribedKey] = true
}

subscribeSlots = append(subscribeSlots, attesterSlot)
subscribeCommitteeIDs = append(subscribeCommitteeIDs, committeeIndex)
subscribeIsAggregator = append(subscribeIsAggregator, aggregator)
log.WithFields(lFields).Info("New assignment")
}
}

Expand All @@ -379,7 +362,7 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error {
return err
}
for _, duty := range dutiesNextEpoch.Duties {
if duty.Status == ethpb.ValidatorStatus_ACTIVE {
if duty.Status == ethpb.ValidatorStatus_ACTIVE || duty.Status == ethpb.ValidatorStatus_EXITING {
attesterSlot := duty.AttesterSlot
committeeIndex := duty.CommitteeIndex

Expand Down Expand Up @@ -521,6 +504,54 @@ func (v *validator) domainData(ctx context.Context, epoch uint64, domain []byte)
return res, nil
}

func (v *validator) logDuties(slot uint64, duties []*ethpb.DutiesResponse_Duty) {
attesterKeys := make([][]string, params.BeaconConfig().SlotsPerEpoch)
for i := range attesterKeys {
attesterKeys[i] = make([]string, 0)
}
proposerKeys := make([]string, params.BeaconConfig().SlotsPerEpoch)
slotOffset := helpers.StartSlot(helpers.SlotToEpoch(slot))

for _, duty := range duties {
if v.emitAccountMetrics {
fmtKey := fmt.Sprintf("%#x", duty.PublicKey)
validatorStatusesGaugeVec.WithLabelValues(fmtKey).Set(float64(duty.Status))
}

// Only interested in validators who are attesting/proposing.
// Note that SLASHING validators will have duties but their results are ignored by the network so we don't bother with them.
if duty.Status != ethpb.ValidatorStatus_ACTIVE && duty.Status != ethpb.ValidatorStatus_EXITING {
continue
}

validatorKey := fmt.Sprintf("%#x", bytesutil.Trunc(duty.PublicKey))
attesterIndex := duty.AttesterSlot - slotOffset
if attesterIndex >= params.BeaconConfig().SlotsPerEpoch {
log.WithField("duty", duty).Warn("Invalid attester slot")
} else {
attesterKeys[duty.AttesterSlot-slotOffset] = append(attesterKeys[duty.AttesterSlot-slotOffset], validatorKey)
}

for _, proposerSlot := range duty.ProposerSlots {
proposerIndex := proposerSlot - slotOffset
if proposerIndex >= params.BeaconConfig().SlotsPerEpoch {
log.WithField("duty", duty).Warn("Invalid proposer slot")
} else {
proposerKeys[proposerIndex] = validatorKey
}
}
}

for i := uint64(0); i < params.BeaconConfig().SlotsPerEpoch; i++ {
if len(attesterKeys[i]) > 0 {
log.WithField("slot", slotOffset+i).WithField("attesters", len(attesterKeys[i])).WithField("pubKeys", attesterKeys[i]).Info("Attestation schedule")
}
if proposerKeys[i] != "" {
log.WithField("slot", slotOffset+i).WithField("pubKey", proposerKeys[i]).Info("Proposal schedule")
}
}
}

// This constructs a validator subscribed key, it's used to track
// which subnet has already been pending requested.
func validatorSubscribeKey(slot uint64, committeeID uint64) [64]byte {
Expand Down