From 407bb2ff5457908eb901bb546c0a4404d79caf2d Mon Sep 17 00:00:00 2001 From: rkapka Date: Thu, 1 Feb 2024 18:28:27 +0100 Subject: [PATCH] atts --- validator/accounts/testing/mock.go | 4 +- validator/client/aggregate.go | 6 +-- validator/client/attest.go | 22 ++++++---- validator/client/iface/validator.go | 4 +- validator/client/log.go | 46 +++++++++++++-------- validator/client/propose_test.go | 2 +- validator/client/runner.go | 6 +-- validator/client/service.go | 2 +- validator/client/testutil/mock_validator.go | 4 +- validator/client/validator.go | 2 +- 10 files changed, 57 insertions(+), 41 deletions(-) diff --git a/validator/accounts/testing/mock.go b/validator/accounts/testing/mock.go index 3a92d08c1f01..0deb8e40d0d6 100644 --- a/validator/accounts/testing/mock.go +++ b/validator/accounts/testing/mock.go @@ -92,7 +92,7 @@ type Validator struct { proposerSettings *validatorserviceconfig.ProposerSettings } -func (_ *Validator) LogSyncCommitteeMessagesSubmitted() {} +func (_ *Validator) LogSubmittedSyncCommitteeMessages() {} func (_ *Validator) Done() { panic("implement me") @@ -154,7 +154,7 @@ func (_ *Validator) SubmitSignedContributionAndProof(_ context.Context, _ primit panic("implement me") } -func (_ *Validator) LogAttestationsSubmitted() { +func (_ *Validator) LogSubmittedAtts(_ primitives.Slot) { panic("implement me") } diff --git a/validator/client/aggregate.go b/validator/client/aggregate.go index 37cc1d2aeb5d..97f6d428b002 100644 --- a/validator/client/aggregate.go +++ b/validator/client/aggregate.go @@ -209,9 +209,9 @@ func (v *validator) addIndicesToLog(duty *ethpb.DutiesResponse_Duty) error { v.attLogsLock.Lock() defer v.attLogsLock.Unlock() - for _, log := range v.attLogs { - if duty.CommitteeIndex == log.data.CommitteeIndex { - log.aggregatorIndices = append(log.aggregatorIndices, duty.ValidatorIndex) + for _, l := range v.submittedAtts { + if duty.CommitteeIndex == l.data.CommitteeIndex { + l.aggregatorPubkeys = append(l.aggregatorPubkeys, duty.PublicKey) } } diff --git a/validator/client/attest.go b/validator/client/attest.go index 58c7eab76d1d..57c05289b201 100644 --- a/validator/client/attest.go +++ b/validator/client/attest.go @@ -15,7 +15,6 @@ import ( fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" - "github.com/prysmaticlabs/prysm/v4/crypto/hash" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/monitoring/tracing" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -144,6 +143,11 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot, tracing.AnnotateError(span, err) return } + r, err := data.HashTreeRoot() + if err != nil { + return + } + logrus.Infof("Submitting attestation %#x", r) attResp, err := v.validatorClient.ProposeAttestation(ctx, attestation) if err != nil { log.WithError(err).Error("Could not submit attestation to beacon node") @@ -154,7 +158,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot, return } - if err := v.saveAttesterIndexToData(data, duty.ValidatorIndex); err != nil { + if err := v.saveSubmittedAtt(data, duty.PublicKey); err != nil { log.WithError(err).Error("Could not save validator index for logging") if v.emitAccountMetrics { ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc() @@ -228,21 +232,21 @@ func (v *validator) getDomainAndSigningRoot(ctx context.Context, data *ethpb.Att return domain, root, nil } -// For logging, this saves the last submitted attester index to its attestation data. The purpose of this -// is to enhance attesting logs to be readable when multiple validator keys ran in a single client. -func (v *validator) saveAttesterIndexToData(data *ethpb.AttestationData, index primitives.ValidatorIndex) error { +// saveSubmittedAtt saves the submitted attestation data along with the attester's pubkey. +// The purpose of this is to display combined attesting logs for all keys managed by the validator client. +func (v *validator) saveSubmittedAtt(data *ethpb.AttestationData, pubkey []byte) error { v.attLogsLock.Lock() defer v.attLogsLock.Unlock() - h, err := hash.Proto(data) + r, err := data.HashTreeRoot() if err != nil { return err } - if v.attLogs[h] == nil { - v.attLogs[h] = &attSubmitted{data, []primitives.ValidatorIndex{}, []primitives.ValidatorIndex{}} + if v.submittedAtts[r] == nil { + v.submittedAtts[r] = &submittedAtt{data, [][]byte{}, [][]byte{}} } - v.attLogs[h] = &attSubmitted{data, append(v.attLogs[h].attesterIndices, index), []primitives.ValidatorIndex{}} + v.submittedAtts[r] = &submittedAtt{data, append(v.submittedAtts[r].attesterPubkeys, pubkey), [][]byte{}} return nil } diff --git a/validator/client/iface/validator.go b/validator/client/iface/validator.go index ebd4e7b41231..a22493b698c1 100644 --- a/validator/client/iface/validator.go +++ b/validator/client/iface/validator.go @@ -52,8 +52,8 @@ type Validator interface { SubmitAggregateAndProof(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) SubmitSyncCommitteeMessage(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) SubmitSignedContributionAndProof(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) - LogAttestationsSubmitted() - LogSyncCommitteeMessagesSubmitted() + LogSubmittedAtts(slot primitives.Slot) + LogSubmittedSyncCommitteeMessages() UpdateDomainDataCaches(ctx context.Context, slot primitives.Slot) WaitForKeymanagerInitialization(ctx context.Context) error Keymanager() (keymanager.IKeymanager, error) diff --git a/validator/client/log.go b/validator/client/log.go index 482eb48b7b56..a406d2c07668 100644 --- a/validator/client/log.go +++ b/validator/client/log.go @@ -12,36 +12,48 @@ import ( var log = logrus.WithField("prefix", "validator") -type attSubmitted struct { +type submittedAtt struct { data *ethpb.AttestationData - attesterIndices []primitives.ValidatorIndex - aggregatorIndices []primitives.ValidatorIndex + attesterPubkeys [][]byte + aggregatorPubkeys [][]byte } -// LogAttestationsSubmitted logs info about submitted attestations. -func (v *validator) LogAttestationsSubmitted() { +// LogSubmittedAtts logs info about submitted attestations. +func (v *validator) LogSubmittedAtts(slot primitives.Slot) { v.attLogsLock.Lock() defer v.attLogsLock.Unlock() - for _, attLog := range v.attLogs { + if len(v.submittedAtts) == 0 { + return + } + + log.Infof("Submitted new attestations for slot %d", slot) + for _, attLog := range v.submittedAtts { + attesterPubkeys := make([]string, len(attLog.attesterPubkeys)) + for i, p := range attLog.attesterPubkeys { + attesterPubkeys[i] = fmt.Sprintf("%#x", bytesutil.Trunc(p)) + } + aggregatorPubkeys := make([]string, len(attLog.aggregatorPubkeys)) + for i, p := range attLog.aggregatorPubkeys { + aggregatorPubkeys[i] = fmt.Sprintf("%#x", bytesutil.Trunc(p)) + } log.WithFields(logrus.Fields{ - "Slot": attLog.data.Slot, - "CommitteeIndex": attLog.data.CommitteeIndex, - "BeaconBlockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.BeaconBlockRoot)), - "SourceEpoch": attLog.data.Source.Epoch, - "SourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Source.Root)), - "TargetEpoch": attLog.data.Target.Epoch, - "TargetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Target.Root)), - "AttesterIndices": attLog.attesterIndices, - "AggregatorIndices": attLog.aggregatorIndices, + "committeeIndex": attLog.data.CommitteeIndex, + "beaconBlockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.BeaconBlockRoot)), + "sourceEpoch": attLog.data.Source.Epoch, + "sourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Source.Root)), + "targetEpoch": attLog.data.Target.Epoch, + "targetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Target.Root)), + "attesterPubkeys": attesterPubkeys, + "aggregatorPubkeys": aggregatorPubkeys, }).Info("Submitted new attestations") } - v.attLogs = make(map[[32]byte]*attSubmitted) + v.submittedAtts = make(map[[32]byte]*submittedAtt) } // LogSyncCommitteeMessagesSubmitted logs info about submitted sync committee messages. -func (v *validator) LogSyncCommitteeMessagesSubmitted() { +func (v *validator) LogSubmittedSyncCommitteeMessages() { log.WithField("messages", v.syncCommitteeStats.totalMessagesSubmitted).Debug("Submitted sync committee messages successfully to beacon node") // Reset the amount. atomic.StoreUint64(&v.syncCommitteeStats.totalMessagesSubmitted, 0) diff --git a/validator/client/propose_test.go b/validator/client/propose_test.go index 4c863767d937..e9c8bd671f44 100644 --- a/validator/client/propose_test.go +++ b/validator/client/propose_test.go @@ -88,7 +88,7 @@ func setupWithKey(t *testing.T, validatorKey bls.SecretKey) (*validator, *mocks, keyManager: newMockKeymanager(t, keypair{pub: pubKey, pri: validatorKey}), validatorClient: m.validatorClient, graffiti: []byte{}, - attLogs: make(map[[32]byte]*attSubmitted), + submittedAtts: make(map[[32]byte]*submittedAtt), aggregatedSlotCommitteeIDCache: aggregatedSlotCommitteeIDCache, } diff --git a/validator/client/runner.go b/validator/client/runner.go index 020fa468ee7e..78c5da2090a2 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -263,9 +263,9 @@ func performRoles(slotCtx context.Context, allRoles map[[48]byte][]iface.Validat " should never happen! Please file a report at github.com/prysmaticlabs/prysm/issues/new") } }() - // Log this client performance in the previous epoch - v.LogAttestationsSubmitted() - v.LogSyncCommitteeMessagesSubmitted() + // Log performance in the previous slot + v.LogSubmittedAtts(slot) + v.LogSubmittedSyncCommitteeMessages() if err := v.LogValidatorGainsAndLosses(slotCtx, slot); err != nil { log.WithError(err).Error("Could not report validator's rewards/penalties") } diff --git a/validator/client/service.go b/validator/client/service.go index d10d3a80c2b1..8a5f8f343a51 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -213,7 +213,7 @@ func (v *ValidatorService) Start() { prevBalance: make(map[[fieldparams.BLSPubkeyLength]byte]uint64), pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex), signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1), - attLogs: make(map[[32]byte]*attSubmitted), + submittedAtts: make(map[[32]byte]*submittedAtt), domainDataCache: cache, aggregatedSlotCommitteeIDCache: aggregatedSlotCommitteeIDCache, voteStats: voteStats{startEpoch: primitives.Epoch(^uint64(0))}, diff --git a/validator/client/testutil/mock_validator.go b/validator/client/testutil/mock_validator.go index e2affbc7679d..7dfbacbb97d6 100644 --- a/validator/client/testutil/mock_validator.go +++ b/validator/client/testutil/mock_validator.go @@ -69,7 +69,7 @@ func (fv *FakeValidator) WaitForKeymanagerInitialization(_ context.Context) erro } // LogSyncCommitteeMessagesSubmitted -- -func (fv *FakeValidator) LogSyncCommitteeMessagesSubmitted() {} +func (fv *FakeValidator) LogSubmittedSyncCommitteeMessages() {} // WaitForChainStart for mocking. func (fv *FakeValidator) WaitForChainStart(_ context.Context) error { @@ -182,7 +182,7 @@ func (*FakeValidator) SubmitSyncCommitteeMessage(_ context.Context, _ primitives } // LogAttestationsSubmitted for mocking. -func (*FakeValidator) LogAttestationsSubmitted() {} +func (*FakeValidator) LogSubmittedAtts(_ primitives.Slot) {} // UpdateDomainDataCaches for mocking. func (*FakeValidator) UpdateDomainDataCaches(context.Context, primitives.Slot) {} diff --git a/validator/client/validator.go b/validator/client/validator.go index 611a72cae1b2..8d8e3282dcf0 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -75,7 +75,7 @@ type validator struct { slashableKeysLock sync.RWMutex eipImportBlacklistedPublicKeys map[[fieldparams.BLSPubkeyLength]byte]bool walletInitializedFeed *event.Feed - attLogs map[[32]byte]*attSubmitted + submittedAtts map[[32]byte]*submittedAtt startBalances map[[fieldparams.BLSPubkeyLength]byte]uint64 dutiesLock sync.RWMutex duties *ethpb.DutiesResponse