Skip to content

Commit

Permalink
atts
Browse files Browse the repository at this point in the history
  • Loading branch information
rkapka committed Feb 1, 2024
1 parent 4b78dd8 commit 407bb2f
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 41 deletions.
4 changes: 2 additions & 2 deletions validator/accounts/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Validator struct {
proposerSettings *validatorserviceconfig.ProposerSettings
}

func (_ *Validator) LogSyncCommitteeMessagesSubmitted() {}
func (_ *Validator) LogSubmittedSyncCommitteeMessages() {}

func (_ *Validator) Done() {
panic("implement me")
Expand Down Expand Up @@ -154,7 +154,7 @@ func (_ *Validator) SubmitSignedContributionAndProof(_ context.Context, _ primit
panic("implement me")
}

func (_ *Validator) LogAttestationsSubmitted() {
func (_ *Validator) LogSubmittedAtts(_ primitives.Slot) {
panic("implement me")
}

Expand Down
6 changes: 3 additions & 3 deletions validator/client/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ func (v *validator) addIndicesToLog(duty *ethpb.DutiesResponse_Duty) error {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

for _, log := range v.attLogs {
if duty.CommitteeIndex == log.data.CommitteeIndex {
log.aggregatorIndices = append(log.aggregatorIndices, duty.ValidatorIndex)
for _, l := range v.submittedAtts {
if duty.CommitteeIndex == l.data.CommitteeIndex {
l.aggregatorPubkeys = append(l.aggregatorPubkeys, duty.PublicKey)
}
}

Expand Down
22 changes: 13 additions & 9 deletions validator/client/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/crypto/hash"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -144,6 +143,11 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
tracing.AnnotateError(span, err)
return
}
r, err := data.HashTreeRoot()
if err != nil {
return
}
logrus.Infof("Submitting attestation %#x", r)
attResp, err := v.validatorClient.ProposeAttestation(ctx, attestation)
if err != nil {
log.WithError(err).Error("Could not submit attestation to beacon node")
Expand All @@ -154,7 +158,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
return
}

if err := v.saveAttesterIndexToData(data, duty.ValidatorIndex); err != nil {
if err := v.saveSubmittedAtt(data, duty.PublicKey); err != nil {
log.WithError(err).Error("Could not save validator index for logging")
if v.emitAccountMetrics {
ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc()
Expand Down Expand Up @@ -228,21 +232,21 @@ func (v *validator) getDomainAndSigningRoot(ctx context.Context, data *ethpb.Att
return domain, root, nil
}

// For logging, this saves the last submitted attester index to its attestation data. The purpose of this
// is to enhance attesting logs to be readable when multiple validator keys ran in a single client.
func (v *validator) saveAttesterIndexToData(data *ethpb.AttestationData, index primitives.ValidatorIndex) error {
// saveSubmittedAtt saves the submitted attestation data along with the attester's pubkey.
// The purpose of this is to display combined attesting logs for all keys managed by the validator client.
func (v *validator) saveSubmittedAtt(data *ethpb.AttestationData, pubkey []byte) error {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

h, err := hash.Proto(data)
r, err := data.HashTreeRoot()
if err != nil {
return err
}

if v.attLogs[h] == nil {
v.attLogs[h] = &attSubmitted{data, []primitives.ValidatorIndex{}, []primitives.ValidatorIndex{}}
if v.submittedAtts[r] == nil {
v.submittedAtts[r] = &submittedAtt{data, [][]byte{}, [][]byte{}}
}
v.attLogs[h] = &attSubmitted{data, append(v.attLogs[h].attesterIndices, index), []primitives.ValidatorIndex{}}
v.submittedAtts[r] = &submittedAtt{data, append(v.submittedAtts[r].attesterPubkeys, pubkey), [][]byte{}}

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions validator/client/iface/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type Validator interface {
SubmitAggregateAndProof(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
SubmitSyncCommitteeMessage(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
SubmitSignedContributionAndProof(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte)
LogAttestationsSubmitted()
LogSyncCommitteeMessagesSubmitted()
LogSubmittedAtts(slot primitives.Slot)
LogSubmittedSyncCommitteeMessages()
UpdateDomainDataCaches(ctx context.Context, slot primitives.Slot)
WaitForKeymanagerInitialization(ctx context.Context) error
Keymanager() (keymanager.IKeymanager, error)
Expand Down
46 changes: 29 additions & 17 deletions validator/client/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,48 @@ import (

var log = logrus.WithField("prefix", "validator")

type attSubmitted struct {
type submittedAtt struct {
data *ethpb.AttestationData
attesterIndices []primitives.ValidatorIndex
aggregatorIndices []primitives.ValidatorIndex
attesterPubkeys [][]byte
aggregatorPubkeys [][]byte
}

// LogAttestationsSubmitted logs info about submitted attestations.
func (v *validator) LogAttestationsSubmitted() {
// LogSubmittedAtts logs info about submitted attestations.
func (v *validator) LogSubmittedAtts(slot primitives.Slot) {
v.attLogsLock.Lock()
defer v.attLogsLock.Unlock()

for _, attLog := range v.attLogs {
if len(v.submittedAtts) == 0 {
return
}

log.Infof("Submitted new attestations for slot %d", slot)
for _, attLog := range v.submittedAtts {
attesterPubkeys := make([]string, len(attLog.attesterPubkeys))
for i, p := range attLog.attesterPubkeys {
attesterPubkeys[i] = fmt.Sprintf("%#x", bytesutil.Trunc(p))
}
aggregatorPubkeys := make([]string, len(attLog.aggregatorPubkeys))
for i, p := range attLog.aggregatorPubkeys {
aggregatorPubkeys[i] = fmt.Sprintf("%#x", bytesutil.Trunc(p))
}
log.WithFields(logrus.Fields{
"Slot": attLog.data.Slot,
"CommitteeIndex": attLog.data.CommitteeIndex,
"BeaconBlockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.BeaconBlockRoot)),
"SourceEpoch": attLog.data.Source.Epoch,
"SourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Source.Root)),
"TargetEpoch": attLog.data.Target.Epoch,
"TargetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Target.Root)),
"AttesterIndices": attLog.attesterIndices,
"AggregatorIndices": attLog.aggregatorIndices,
"committeeIndex": attLog.data.CommitteeIndex,
"beaconBlockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.BeaconBlockRoot)),
"sourceEpoch": attLog.data.Source.Epoch,
"sourceRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Source.Root)),
"targetEpoch": attLog.data.Target.Epoch,
"targetRoot": fmt.Sprintf("%#x", bytesutil.Trunc(attLog.data.Target.Root)),
"attesterPubkeys": attesterPubkeys,
"aggregatorPubkeys": aggregatorPubkeys,
}).Info("Submitted new attestations")
}

v.attLogs = make(map[[32]byte]*attSubmitted)
v.submittedAtts = make(map[[32]byte]*submittedAtt)
}

// LogSyncCommitteeMessagesSubmitted logs info about submitted sync committee messages.
func (v *validator) LogSyncCommitteeMessagesSubmitted() {
func (v *validator) LogSubmittedSyncCommitteeMessages() {
log.WithField("messages", v.syncCommitteeStats.totalMessagesSubmitted).Debug("Submitted sync committee messages successfully to beacon node")
// Reset the amount.
atomic.StoreUint64(&v.syncCommitteeStats.totalMessagesSubmitted, 0)
Expand Down
2 changes: 1 addition & 1 deletion validator/client/propose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func setupWithKey(t *testing.T, validatorKey bls.SecretKey) (*validator, *mocks,
keyManager: newMockKeymanager(t, keypair{pub: pubKey, pri: validatorKey}),
validatorClient: m.validatorClient,
graffiti: []byte{},
attLogs: make(map[[32]byte]*attSubmitted),
submittedAtts: make(map[[32]byte]*submittedAtt),
aggregatedSlotCommitteeIDCache: aggregatedSlotCommitteeIDCache,
}

Expand Down
6 changes: 3 additions & 3 deletions validator/client/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func performRoles(slotCtx context.Context, allRoles map[[48]byte][]iface.Validat
" should never happen! Please file a report at github.com/prysmaticlabs/prysm/issues/new")
}
}()
// Log this client performance in the previous epoch
v.LogAttestationsSubmitted()
v.LogSyncCommitteeMessagesSubmitted()
// Log performance in the previous slot
v.LogSubmittedAtts(slot)
v.LogSubmittedSyncCommitteeMessages()
if err := v.LogValidatorGainsAndLosses(slotCtx, slot); err != nil {
log.WithError(err).Error("Could not report validator's rewards/penalties")
}
Expand Down
2 changes: 1 addition & 1 deletion validator/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (v *ValidatorService) Start() {
prevBalance: make(map[[fieldparams.BLSPubkeyLength]byte]uint64),
pubkeyToValidatorIndex: make(map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex),
signedValidatorRegistrations: make(map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1),
attLogs: make(map[[32]byte]*attSubmitted),
submittedAtts: make(map[[32]byte]*submittedAtt),
domainDataCache: cache,
aggregatedSlotCommitteeIDCache: aggregatedSlotCommitteeIDCache,
voteStats: voteStats{startEpoch: primitives.Epoch(^uint64(0))},
Expand Down
4 changes: 2 additions & 2 deletions validator/client/testutil/mock_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (fv *FakeValidator) WaitForKeymanagerInitialization(_ context.Context) erro
}

// LogSyncCommitteeMessagesSubmitted --
func (fv *FakeValidator) LogSyncCommitteeMessagesSubmitted() {}
func (fv *FakeValidator) LogSubmittedSyncCommitteeMessages() {}

// WaitForChainStart for mocking.
func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
Expand Down Expand Up @@ -182,7 +182,7 @@ func (*FakeValidator) SubmitSyncCommitteeMessage(_ context.Context, _ primitives
}

// LogAttestationsSubmitted for mocking.
func (*FakeValidator) LogAttestationsSubmitted() {}
func (*FakeValidator) LogSubmittedAtts(_ primitives.Slot) {}

// UpdateDomainDataCaches for mocking.
func (*FakeValidator) UpdateDomainDataCaches(context.Context, primitives.Slot) {}
Expand Down
2 changes: 1 addition & 1 deletion validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type validator struct {
slashableKeysLock sync.RWMutex
eipImportBlacklistedPublicKeys map[[fieldparams.BLSPubkeyLength]byte]bool
walletInitializedFeed *event.Feed
attLogs map[[32]byte]*attSubmitted
submittedAtts map[[32]byte]*submittedAtt
startBalances map[[fieldparams.BLSPubkeyLength]byte]uint64
dutiesLock sync.RWMutex
duties *ethpb.DutiesResponse
Expand Down

0 comments on commit 407bb2f

Please sign in to comment.