Skip to content

Commit

Permalink
Improve vc logs (#13573)
Browse files Browse the repository at this point in the history
* duties

* atts

* revert some changes

* revert timeTillDuty

* Manu's review

* Revert "Auxiliary commit to revert individual files from 6806ca9"

This reverts commit 0820c870d2627950179b0edf7ce62ee4fa4a03a3.

* remove trash

* more reivew

* making Manu happy

* test fixes
  • Loading branch information
rkapka authored Feb 8, 2024
1 parent 5afb125 commit 91504eb
Show file tree
Hide file tree
Showing 22 changed files with 236 additions and 166 deletions.
2 changes: 1 addition & 1 deletion validator/accounts/accounts_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (acm *CLIManager) Delete(ctx context.Context) error {
}); err != nil {
return err
}
log.WithField("publicKeys", allAccountStr).Warn(
log.WithField("pubkeys", allAccountStr).Warn(
"Attempted to delete accounts. IMPORTANT: please run `validator accounts list` to ensure " +
"the public keys are indeed deleted. If they are still there, please file an issue at " +
"https://github.com/prysmaticlabs/prysm/issues/new")
Expand Down
2 changes: 1 addition & 1 deletion validator/accounts/accounts_exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func displayExitInfo(rawExitedKeys [][]byte, trimmedExitedKeys []string) {
info := fmt.Sprintf("Voluntary exit was successful for the accounts listed. "+
"URLs where you can track each validator's exit:\n"+strings.Repeat("%s\n", len(ifaceKeys)), ifaceKeys...)

log.WithField("publicKeys", strings.Join(trimmedExitedKeys, ", ")).Info(info)
log.WithField("pubkeys", strings.Join(trimmedExitedKeys, ", ")).Info(info)
} else {
log.Info("No successful voluntary exits")
}
Expand Down
4 changes: 2 additions & 2 deletions validator/accounts/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Validator struct {
proposerSettings *validatorserviceconfig.ProposerSettings
}

func (_ *Validator) LogSyncCommitteeMessagesSubmitted() {}
func (_ *Validator) LogSubmittedSyncCommitteeMessages() {}

func (_ *Validator) Done() {
panic("implement me")
Expand Down Expand Up @@ -154,7 +154,7 @@ func (_ *Validator) SubmitSignedContributionAndProof(_ context.Context, _ primit
panic("implement me")
}

func (_ *Validator) LogAttestationsSubmitted() {
func (_ *Validator) LogSubmittedAtts(_ primitives.Slot) {
panic("implement me")
}

Expand Down
15 changes: 1 addition & 14 deletions validator/client/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives
return
}

if err := v.addIndicesToLog(duty); err != nil {
if err := v.saveSubmittedAtt(res.AggregateAndProof.Aggregate.Data, pubKey[:], true); err != nil {
log.WithError(err).Error("Could not add aggregator indices to logs")
if v.emitAccountMetrics {
ValidatorAggFailVec.WithLabelValues(fmtKey).Inc()
Expand Down Expand Up @@ -216,16 +216,3 @@ func (v *validator) aggregateAndProofSig(ctx context.Context, pubKey [fieldparam

return sig.Marshal(), nil
}

func (v *validator) addIndicesToLog(duty *ethpb.DutiesResponse_Duty) error {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

for _, log := range v.attLogs {
if duty.CommitteeIndex == log.data.CommitteeIndex {
log.aggregatorIndices = append(log.aggregatorIndices, duty.ValidatorIndex)
}
}

return nil
}
44 changes: 12 additions & 32 deletions validator/client/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@ package client
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v4/async"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v4/config/features"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/crypto/hash"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -55,7 +54,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
defer lock.Unlock()

fmtKey := fmt.Sprintf("%#x", pubKey[:])
log := log.WithField("pubKey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:]))).WithField("slot", slot)
log := log.WithField("pubkey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:]))).WithField("slot", slot)
duty, err := v.duty(pubKey)
if err != nil {
log.WithError(err).Error("Could not fetch validator assignment")
Expand Down Expand Up @@ -154,7 +153,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
return
}

if err := v.saveAttesterIndexToData(data, duty.ValidatorIndex); err != nil {
if err := v.saveSubmittedAtt(data, pubKey[:], false); err != nil {
log.WithError(err).Error("Could not save validator index for logging")
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
Expand Down Expand Up @@ -228,25 +227,6 @@ func (v *validator) getDomainAndSigningRoot(ctx context.Context, data *ethpb.Att
return domain, root, nil
}

// For logging, this saves the last submitted attester index to its attestation data. The purpose of this
// is to enhance attesting logs to be readable when multiple validator keys ran in a single client.
func (v *validator) saveAttesterIndexToData(data *ethpb.AttestationData, index primitives.ValidatorIndex) error {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

h, err := hash.Proto(data)
if err != nil {
return err
}

if v.attLogs[h] == nil {
v.attLogs[h] = &attSubmitted{data, []primitives.ValidatorIndex{}, []primitives.ValidatorIndex{}}
}
v.attLogs[h] = &attSubmitted{data, append(v.attLogs[h].attesterIndices, index), []primitives.ValidatorIndex{}}

return nil
}

// highestSlot returns the highest slot with a valid block seen by the validator
func (v *validator) highestSlot() primitives.Slot {
v.highestValidSlotLock.Lock()
Expand Down Expand Up @@ -313,14 +293,14 @@ func (v *validator) waitOneThirdOrValidBlock(ctx context.Context, slot primitive

func attestationLogFields(pubKey [fieldparams.BLSPubkeyLength]byte, indexedAtt *ethpb.IndexedAttestation) logrus.Fields {
return logrus.Fields{
"attesterPublicKey": fmt.Sprintf("%#x", pubKey),
"attestationSlot": indexedAtt.Data.Slot,
"committeeIndex": indexedAtt.Data.CommitteeIndex,
"beaconBlockRoot": fmt.Sprintf("%#x", indexedAtt.Data.BeaconBlockRoot),
"sourceEpoch": indexedAtt.Data.Source.Epoch,
"sourceRoot": fmt.Sprintf("%#x", indexedAtt.Data.Source.Root),
"targetEpoch": indexedAtt.Data.Target.Epoch,
"targetRoot": fmt.Sprintf("%#x", indexedAtt.Data.Target.Root),
"signature": fmt.Sprintf("%#x", indexedAtt.Signature),
"pubkey": fmt.Sprintf("%#x", pubKey),
"slot": indexedAtt.Data.Slot,
"committeeIndex": indexedAtt.Data.CommitteeIndex,
"blockRoot": fmt.Sprintf("%#x", indexedAtt.Data.BeaconBlockRoot),
"sourceEpoch": indexedAtt.Data.Source.Epoch,
"sourceRoot": fmt.Sprintf("%#x", indexedAtt.Data.Source.Root),
"targetEpoch": indexedAtt.Data.Target.Epoch,
"targetRoot": fmt.Sprintf("%#x", indexedAtt.Data.Target.Root),
"signature": fmt.Sprintf("%#x", indexedAtt.Signature),
}
}
4 changes: 2 additions & 2 deletions validator/client/iface/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type Validator interface {
SubmitAggregateAndProof(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
SubmitSyncCommitteeMessage(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
SubmitSignedContributionAndProof(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
LogAttestationsSubmitted()
LogSyncCommitteeMessagesSubmitted()
LogSubmittedAtts(slot primitives.Slot)
LogSubmittedSyncCommitteeMessages()
UpdateDomainDataCaches(ctx context.Context, slot primitives.Slot)
WaitForKeymanagerInitialization(ctx context.Context) error
Keymanager() (keymanager.IKeymanager, error)
Expand Down
136 changes: 116 additions & 20 deletions validator/client/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,142 @@ package client

import (
"fmt"
"strconv"
"sync/atomic"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/sirupsen/logrus"
)

var log = logrus.WithField("prefix", "validator")
var log = logrus.WithField("prefix", "client")

type attSubmitted struct {
data *ethpb.AttestationData
attesterIndices []primitives.ValidatorIndex
aggregatorIndices []primitives.ValidatorIndex
type submittedAttData struct {
beaconBlockRoot []byte
source *ethpb.Checkpoint
target *ethpb.Checkpoint
}

// LogAttestationsSubmitted logs info about submitted attestations.
func (v *validator) LogAttestationsSubmitted() {
type submittedAtt struct {
data submittedAttData
pubkeys [][]byte
committees []primitives.CommitteeIndex
}

// submittedAttKey is defined as a concatenation of:
// - AttestationData.BeaconBlockRoot
// - AttestationData.Source.HashTreeRoot()
// - AttestationData.Target.HashTreeRoot()
type submittedAttKey [96]byte

func (k submittedAttKey) FromAttData(data *ethpb.AttestationData) error {
sourceRoot, err := data.Source.HashTreeRoot()
if err != nil {
return err
}
targetRoot, err := data.Target.HashTreeRoot()
if err != nil {
return err
}
copy(k[0:], data.BeaconBlockRoot)
copy(k[32:], sourceRoot[:])
copy(k[64:], targetRoot[:])
return nil
}

// saveSubmittedAtt saves the submitted attestation data along with the attester's pubkey.
// The purpose of this is to display combined attesting logs for all keys managed by the validator client.
func (v *validator) saveSubmittedAtt(data *ethpb.AttestationData, pubkey []byte, isAggregate bool) error {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

key := submittedAttKey{}
if err := key.FromAttData(data); err != nil {
return errors.Wrapf(err, "could not create submitted attestation key")
}
d := submittedAttData{
beaconBlockRoot: data.BeaconBlockRoot,
source: data.Source,
target: data.Target,
}

var submittedAtts map[submittedAttKey]*submittedAtt
if isAggregate {
submittedAtts = v.submittedAggregates
} else {
submittedAtts = v.submittedAtts
}

if submittedAtts[key] == nil {
submittedAtts[key] = &submittedAtt{
d,
[][]byte{},
[]primitives.CommitteeIndex{},
}
}
submittedAtts[key] = &submittedAtt{
d,
append(submittedAtts[key].pubkeys, pubkey),
append(submittedAtts[key].committees, data.CommitteeIndex),
}

return nil
}

// LogSubmittedAtts logs info about submitted attestations.
func (v *validator) LogSubmittedAtts(slot primitives.Slot) {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

for _, attLog := range v.attLogs {
for _, attLog := range v.submittedAtts {
pubkeys := make([]string, len(attLog.pubkeys))
for i, p := range attLog.pubkeys {
pubkeys[i] = fmt.Sprintf("%#x", bytesutil.Trunc(p))
}
committees := make([]string, len(attLog.committees))
for i, c := range attLog.committees {
committees[i] = strconv.FormatUint(uint64(c), 10)
}
log.WithFields(logrus.Fields{
"Slot": attLog.data.Slot,
"CommitteeIndex": attLog.data.CommitteeIndex,
"BeaconBlockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.BeaconBlockRoot)),
"SourceEpoch": attLog.data.Source.Epoch,
"SourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Source.Root)),
"TargetEpoch": attLog.data.Target.Epoch,
"TargetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Target.Root)),
"AttesterIndices": attLog.attesterIndices,
"AggregatorIndices": attLog.aggregatorIndices,
"slot": slot,
"committeeIndices": committees,
"pubkeys": pubkeys,
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.beaconBlockRoot)),
"sourceEpoch": attLog.data.source.Epoch,
"sourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.source.Root)),
"targetEpoch": attLog.data.target.Epoch,
"targetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.target.Root)),
}).Info("Submitted new attestations")
}
for _, attLog := range v.submittedAggregates {
pubkeys := make([]string, len(attLog.pubkeys))
for i, p := range attLog.pubkeys {
pubkeys[i] = fmt.Sprintf("%#x", bytesutil.Trunc(p))
}
committees := make([]string, len(attLog.committees))
for i, c := range attLog.committees {
committees[i] = strconv.FormatUint(uint64(c), 10)
}
log.WithFields(logrus.Fields{
"slot": slot,
"committeeIndices": committees,
"pubkeys": pubkeys,
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.beaconBlockRoot)),
"sourceEpoch": attLog.data.source.Epoch,
"sourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.source.Root)),
"targetEpoch": attLog.data.target.Epoch,
"targetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.target.Root)),
}).Info("Submitted new aggregate attestations")
}

v.attLogs = make(map[[32]byte]*attSubmitted)
v.submittedAtts = make(map[submittedAttKey]*submittedAtt)
v.submittedAggregates = make(map[submittedAttKey]*submittedAtt)
}

// LogSyncCommitteeMessagesSubmitted logs info about submitted sync committee messages.
func (v *validator) LogSyncCommitteeMessagesSubmitted() {
// LogSubmittedSyncCommitteeMessages logs info about submitted sync committee messages.
func (v *validator) LogSubmittedSyncCommitteeMessages() {
log.WithField("messages", v.syncCommitteeStats.totalMessagesSubmitted).Debug("Submitted sync committee messages successfully to beacon node")
// Reset the amount.
atomic.StoreUint64(&v.syncCommitteeStats.totalMessagesSubmitted, 0)
Expand Down
12 changes: 6 additions & 6 deletions validator/client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,25 +298,25 @@ func (v *validator) logForEachValidator(index int, pubKey []byte, resp *ethpb.Va
if index < len(resp.BalancesBeforeEpochTransition) {
balBeforeEpoch = resp.BalancesBeforeEpochTransition[index]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing balance before epoch transition")
log.WithField("pubkey", truncatedKey).Warn("Missing balance before epoch transition")
}
if index < len(resp.BalancesAfterEpochTransition) {
balAfterEpoch = resp.BalancesAfterEpochTransition[index]
}
if index < len(resp.CorrectlyVotedSource) {
correctlyVotedSource = resp.CorrectlyVotedSource[index]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted source")
log.WithField("pubkey", truncatedKey).Warn("Missing correctly voted source")
}
if index < len(resp.CorrectlyVotedTarget) {
correctlyVotedTarget = resp.CorrectlyVotedTarget[index]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted target")
log.WithField("pubkey", truncatedKey).Warn("Missing correctly voted target")
}
if index < len(resp.CorrectlyVotedHead) {
correctlyVotedHead = resp.CorrectlyVotedHead[index]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted head")
log.WithField("pubkey", truncatedKey).Warn("Missing correctly voted head")
}

if _, ok := v.startBalances[pubKeyBytes]; !ok {
Expand All @@ -333,7 +333,7 @@ func (v *validator) logForEachValidator(index int, pubKey []byte, resp *ethpb.Va
percentSinceStart := (newBalance - startBalance) / startBalance

previousEpochSummaryFields := logrus.Fields{
"pubKey": truncatedKey,
"pubkey": truncatedKey,
"epoch": prevEpoch,
"correctlyVotedSource": correctlyVotedSource,
"correctlyVotedTarget": correctlyVotedTarget,
Expand All @@ -349,7 +349,7 @@ func (v *validator) logForEachValidator(index int, pubKey []byte, resp *ethpb.Va
if index < len(resp.InactivityScores) {
previousEpochSummaryFields["inactivityScore"] = resp.InactivityScores[index]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing inactivity score")
log.WithField("pubkey", truncatedKey).Warn("Missing inactivity score")
}
}

Expand Down
Loading

0 comments on commit 91504eb

Please sign in to comment.