Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More tracing in the validator client #14125

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions validator/client/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives

// Signs input slot with domain selection proof. This is used to create the signature for aggregator selection.
func (v *validator) signSlotWithSelectionProof(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) (signature []byte, err error) {
ctx, span := trace.StartSpan(ctx, "validator.signSlotWithSelectionProof")
defer span.End()

domain, err := v.domainData(ctx, slots.ToEpoch(slot), params.BeaconConfig().DomainSelectionProof[:])
if err != nil {
return nil, err
Expand Down Expand Up @@ -194,6 +197,9 @@ func (v *validator) waitToSlotTwoThirds(ctx context.Context, slot primitives.Slo
// This returns the signature of validator signing over aggregate and
// proof object.
func (v *validator) aggregateAndProofSig(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte, agg *ethpb.AggregateAttestationAndProof, slot primitives.Slot) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "validator.aggregateAndProofSig")
defer span.End()

d, err := v.domainData(ctx, slots.ToEpoch(agg.Aggregate.Data.Slot), params.BeaconConfig().DomainAggregateAndProof[:])
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions validator/client/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ func (v *validator) duty(pubKey [fieldparams.BLSPubkeyLength]byte) (*ethpb.Dutie

// Given validator's public key, this function returns the signature of an attestation data and its signing root.
func (v *validator) signAtt(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte, data *ethpb.AttestationData, slot primitives.Slot) ([]byte, [32]byte, error) {
ctx, span := trace.StartSpan(ctx, "validator.signAtt")
defer span.End()

domain, root, err := v.domainAndSigningRoot(ctx, data)
if err != nil {
return nil, [32]byte{}, err
Expand Down
1 change: 1 addition & 0 deletions validator/client/beacon-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_protobuf//types/known/timestamppb:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
Expand Down
80 changes: 78 additions & 2 deletions validator/client/beacon-api/beacon_api_validator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/validator/client/iface"
"go.opencensus.io/trace"
)

type ValidatorClientOpt func(*beaconApiValidatorClient)
Expand Down Expand Up @@ -47,12 +48,16 @@ func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler, opts ...Valida
}

func (c *beaconApiValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.Duties")
defer span.End()
return wrapInMetrics[*ethpb.DutiesResponse]("Duties", func() (*ethpb.DutiesResponse, error) {
return c.duties(ctx, in)
})
}

func (c *beaconApiValidatorClient) CheckDoppelGanger(ctx context.Context, in *ethpb.DoppelGangerRequest) (*ethpb.DoppelGangerResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.CheckDoppelGanger")
defer span.End()
return wrapInMetrics[*ethpb.DoppelGangerResponse]("CheckDoppelGanger", func() (*ethpb.DoppelGangerResponse, error) {
return c.checkDoppelGanger(ctx, in)
})
Expand All @@ -62,6 +67,10 @@ func (c *beaconApiValidatorClient) DomainData(ctx context.Context, in *ethpb.Dom
if len(in.Domain) != 4 {
return nil, errors.Errorf("invalid domain type: %s", hexutil.Encode(in.Domain))
}

ctx, span := trace.StartSpan(ctx, "beacon-api.DomainData")
defer span.End()

domainType := bytesutil.ToBytes4(in.Domain)

return wrapInMetrics[*ethpb.DomainResponse]("DomainData", func() (*ethpb.DomainResponse, error) {
Expand All @@ -70,12 +79,18 @@ func (c *beaconApiValidatorClient) DomainData(ctx context.Context, in *ethpb.Dom
}

func (c *beaconApiValidatorClient) AttestationData(ctx context.Context, in *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.AttestationData")
defer span.End()

return wrapInMetrics[*ethpb.AttestationData]("AttestationData", func() (*ethpb.AttestationData, error) {
return c.attestationData(ctx, in.Slot, in.CommitteeIndex)
})
}

func (c *beaconApiValidatorClient) BeaconBlock(ctx context.Context, in *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.BeaconBlock")
defer span.End()

return wrapInMetrics[*ethpb.GenericBeaconBlock]("BeaconBlock", func() (*ethpb.GenericBeaconBlock, error) {
return c.beaconBlock(ctx, in.Slot, in.RandaoReveal, in.Graffiti)
})
Expand All @@ -86,48 +101,72 @@ func (c *beaconApiValidatorClient) FeeRecipientByPubKey(_ context.Context, _ *et
}

func (c *beaconApiValidatorClient) SyncCommitteeContribution(ctx context.Context, in *ethpb.SyncCommitteeContributionRequest) (*ethpb.SyncCommitteeContribution, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.SyncCommitteeContribution")
defer span.End()

return wrapInMetrics[*ethpb.SyncCommitteeContribution]("SyncCommitteeContribution", func() (*ethpb.SyncCommitteeContribution, error) {
return c.syncCommitteeContribution(ctx, in)
})
}

func (c *beaconApiValidatorClient) SyncMessageBlockRoot(ctx context.Context, _ *empty.Empty) (*ethpb.SyncMessageBlockRootResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.SyncMessageBlockRoot")
defer span.End()

return wrapInMetrics[*ethpb.SyncMessageBlockRootResponse]("SyncMessageBlockRoot", func() (*ethpb.SyncMessageBlockRootResponse, error) {
return c.syncMessageBlockRoot(ctx)
})
}

func (c *beaconApiValidatorClient) SyncSubcommitteeIndex(ctx context.Context, in *ethpb.SyncSubcommitteeIndexRequest) (*ethpb.SyncSubcommitteeIndexResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.SyncSubcommitteeIndex")
defer span.End()

return wrapInMetrics[*ethpb.SyncSubcommitteeIndexResponse]("SyncSubcommitteeIndex", func() (*ethpb.SyncSubcommitteeIndexResponse, error) {
return c.syncSubcommitteeIndex(ctx, in)
})
}

func (c *beaconApiValidatorClient) MultipleValidatorStatus(ctx context.Context, in *ethpb.MultipleValidatorStatusRequest) (*ethpb.MultipleValidatorStatusResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.MultipleValidatorStatus")
defer span.End()

return wrapInMetrics[*ethpb.MultipleValidatorStatusResponse]("MultipleValidatorStatus", func() (*ethpb.MultipleValidatorStatusResponse, error) {
return c.multipleValidatorStatus(ctx, in)
})
}

func (c *beaconApiValidatorClient) PrepareBeaconProposer(ctx context.Context, in *ethpb.PrepareBeaconProposerRequest) (*empty.Empty, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.PrepareBeaconProposer")
defer span.End()

return wrapInMetrics[*empty.Empty]("PrepareBeaconProposer", func() (*empty.Empty, error) {
return new(empty.Empty), c.prepareBeaconProposer(ctx, in.Recipients)
})
}

func (c *beaconApiValidatorClient) ProposeAttestation(ctx context.Context, in *ethpb.Attestation) (*ethpb.AttestResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.ProposeAttestation")
defer span.End()

return wrapInMetrics[*ethpb.AttestResponse]("ProposeAttestation", func() (*ethpb.AttestResponse, error) {
return c.proposeAttestation(ctx, in)
})
}

func (c *beaconApiValidatorClient) ProposeBeaconBlock(ctx context.Context, in *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.ProposeBeaconBlock")
defer span.End()

return wrapInMetrics[*ethpb.ProposeResponse]("ProposeBeaconBlock", func() (*ethpb.ProposeResponse, error) {
return c.proposeBeaconBlock(ctx, in)
})
}

func (c *beaconApiValidatorClient) ProposeExit(ctx context.Context, in *ethpb.SignedVoluntaryExit) (*ethpb.ProposeExitResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.ProposeExit")
defer span.End()

return wrapInMetrics[*ethpb.ProposeExitResponse]("ProposeExit", func() (*ethpb.ProposeExitResponse, error) {
return c.proposeExit(ctx, in)
})
Expand All @@ -138,52 +177,79 @@ func (c *beaconApiValidatorClient) StreamBlocksAltair(ctx context.Context, in *e
}

func (c *beaconApiValidatorClient) SubmitAggregateSelectionProof(ctx context.Context, in *ethpb.AggregateSelectionRequest, index primitives.ValidatorIndex, committeeLength uint64) (*ethpb.AggregateSelectionResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.SubmitAggregateSelectionProof")
defer span.End()

return wrapInMetrics[*ethpb.AggregateSelectionResponse]("SubmitAggregateSelectionProof", func() (*ethpb.AggregateSelectionResponse, error) {
return c.submitAggregateSelectionProof(ctx, in, index, committeeLength)
})
}

func (c *beaconApiValidatorClient) SubmitSignedAggregateSelectionProof(ctx context.Context, in *ethpb.SignedAggregateSubmitRequest) (*ethpb.SignedAggregateSubmitResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.SubmitSignedAggregateSelectionProof")
defer span.End()

return wrapInMetrics[*ethpb.SignedAggregateSubmitResponse]("SubmitSignedAggregateSelectionProof", func() (*ethpb.SignedAggregateSubmitResponse, error) {
return c.submitSignedAggregateSelectionProof(ctx, in)
})
}

func (c *beaconApiValidatorClient) SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.SubmitSignedContributionAndProof")
defer span.End()

return wrapInMetrics[*empty.Empty]("SubmitSignedContributionAndProof", func() (*empty.Empty, error) {
return new(empty.Empty), c.submitSignedContributionAndProof(ctx, in)
})
}

func (c *beaconApiValidatorClient) SubmitSyncMessage(ctx context.Context, in *ethpb.SyncCommitteeMessage) (*empty.Empty, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.SubmitSyncMessage")
defer span.End()

return wrapInMetrics[*empty.Empty]("SubmitSyncMessage", func() (*empty.Empty, error) {
return new(empty.Empty), c.submitSyncMessage(ctx, in)
})
}

func (c *beaconApiValidatorClient) SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.SubmitValidatorRegistrations")
defer span.End()

return wrapInMetrics[*empty.Empty]("SubmitValidatorRegistrations", func() (*empty.Empty, error) {
return new(empty.Empty), c.submitValidatorRegistrations(ctx, in.Messages)
})
}

func (c *beaconApiValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, duties []*ethpb.DutiesResponse_Duty) (*empty.Empty, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.SubscribeCommitteeSubnets")
defer span.End()

return wrapInMetrics[*empty.Empty]("SubscribeCommitteeSubnets", func() (*empty.Empty, error) {
return new(empty.Empty), c.subscribeCommitteeSubnets(ctx, in, duties)
})
}

func (c *beaconApiValidatorClient) ValidatorIndex(ctx context.Context, in *ethpb.ValidatorIndexRequest) (*ethpb.ValidatorIndexResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.ValidatorIndex")
defer span.End()

return wrapInMetrics[*ethpb.ValidatorIndexResponse]("ValidatorIndex", func() (*ethpb.ValidatorIndexResponse, error) {
return c.validatorIndex(ctx, in)
})
}

func (c *beaconApiValidatorClient) ValidatorStatus(ctx context.Context, in *ethpb.ValidatorStatusRequest) (*ethpb.ValidatorStatusResponse, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.ValidatorStatus")
defer span.End()

return c.validatorStatus(ctx, in)
}

func (c *beaconApiValidatorClient) WaitForActivation(ctx context.Context, in *ethpb.ValidatorActivationRequest) (ethpb.BeaconNodeValidator_WaitForActivationClient, error) {
ctx, span := trace.StartSpan(ctx, "beacon-api.WaitForActivation")
defer span.End()

return c.waitForActivation(ctx, in)
}

Expand Down Expand Up @@ -212,11 +278,21 @@ func (c *beaconApiValidatorClient) EventStreamIsRunning() bool {
}

func (c *beaconApiValidatorClient) AggregatedSelections(ctx context.Context, selections []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) {
return c.aggregatedSelection(ctx, selections)
ctx, span := trace.StartSpan(ctx, "beacon-api.AggregatedSelections")
defer span.End()

return wrapInMetrics[[]iface.BeaconCommitteeSelection]("AggregatedSelections", func() ([]iface.BeaconCommitteeSelection, error) {
return c.aggregatedSelection(ctx, selections)
})
}

func (c *beaconApiValidatorClient) AggregatedSyncSelections(ctx context.Context, selections []iface.SyncCommitteeSelection) ([]iface.SyncCommitteeSelection, error) {
return c.aggregatedSyncSelections(ctx, selections)
ctx, span := trace.StartSpan(ctx, "beacon-api.AggregatedSyncSelections")
defer span.End()

return wrapInMetrics[[]iface.SyncCommitteeSelection]("AggregatedSyncSelections", func() ([]iface.SyncCommitteeSelection, error) {
return c.aggregatedSyncSelections(ctx, selections)
})
}

func wrapInMetrics[Resp any](action string, f func() (Resp, error)) (Resp, error) {
Expand Down
18 changes: 18 additions & 0 deletions validator/client/propose.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ func CreateSignedVoluntaryExit(

// Sign randao reveal with randao domain and private key.
func (v *validator) signRandaoReveal(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte, epoch primitives.Epoch, slot primitives.Slot) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "validator.signRandaoReveal")
defer span.End()

domain, err := v.domainData(ctx, epoch, params.BeaconConfig().DomainRandao[:])
if err != nil {
return nil, errors.Wrap(err, domainDataErr)
Expand Down Expand Up @@ -359,6 +362,9 @@ func (v *validator) signRandaoReveal(ctx context.Context, pubKey [fieldparams.BL
// Sign block with proposer domain and private key.
// Returns the signature, block signing root, and any error.
func (v *validator) signBlock(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte, epoch primitives.Epoch, slot primitives.Slot, b interfaces.ReadOnlyBeaconBlock) ([]byte, [32]byte, error) {
ctx, span := trace.StartSpan(ctx, "validator.signBlock")
defer span.End()

domain, err := v.domainData(ctx, epoch, params.BeaconConfig().DomainBeaconProposer[:])
if err != nil {
return nil, [32]byte{}, errors.Wrap(err, domainDataErr)
Expand Down Expand Up @@ -397,6 +403,9 @@ func signVoluntaryExit(
exit *ethpb.VoluntaryExit,
slot primitives.Slot,
) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "validator.signVoluntaryExit")
defer span.End()

req := &ethpb.DomainRequest{
Epoch: exit.Epoch,
Domain: params.BeaconConfig().DomainVoluntaryExit[:],
Expand Down Expand Up @@ -430,6 +439,9 @@ func signVoluntaryExit(

// Graffiti gets the graffiti from cli or file for the validator public key.
func (v *validator) Graffiti(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "validator.Graffiti")
defer span.End()

if v.proposerSettings != nil {
// Check proposer settings for specific key first
if v.proposerSettings.ProposeConfig != nil {
Expand Down Expand Up @@ -493,6 +505,9 @@ func (v *validator) Graffiti(ctx context.Context, pubKey [fieldparams.BLSPubkeyL
}

func (v *validator) SetGraffiti(ctx context.Context, pubkey [fieldparams.BLSPubkeyLength]byte, graffiti []byte) error {
ctx, span := trace.StartSpan(ctx, "validator.SetGraffiti")
defer span.End()

if graffiti == nil {
return nil
}
Expand All @@ -518,6 +533,9 @@ func (v *validator) SetGraffiti(ctx context.Context, pubkey [fieldparams.BLSPubk
}

func (v *validator) DeleteGraffiti(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte) error {
ctx, span := trace.StartSpan(ctx, "validator.DeleteGraffiti")
defer span.End()

if v.proposerSettings == nil || v.proposerSettings.ProposeConfig == nil {
return errors.New("attempted to delete graffiti without proposer settings, graffiti will default to flag options")
}
Expand Down
6 changes: 6 additions & 0 deletions validator/client/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func SubmitValidatorRegistrations(

// Sings validator registration obj with the proposer domain and private key.
func signValidatorRegistration(ctx context.Context, signer iface.SigningFunc, reg *ethpb.ValidatorRegistrationV1) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "validator.signValidatorRegistration")
defer span.End()

// Per spec, we want the fork version and genesis validator to be nil.
// Which is genesis value and zero by default.
d, err := signing.ComputeDomain(
Expand Down Expand Up @@ -91,6 +94,9 @@ func signValidatorRegistration(ctx context.Context, signer iface.SigningFunc, re

// SignValidatorRegistrationRequest compares and returns either the cached validator registration request or signs a new one.
func (v *validator) SignValidatorRegistrationRequest(ctx context.Context, signer iface.SigningFunc, newValidatorRegistration *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) {
ctx, span := trace.StartSpan(ctx, "validator.SignValidatorRegistrationRequest")
defer span.End()

signedReg, ok := v.signedValidatorRegistrations[bytesutil.ToBytes48(newValidatorRegistration.Pubkey)]
if ok && isValidatorRegistrationSame(signedReg.Message, newValidatorRegistration) {
return signedReg, nil
Expand Down
3 changes: 3 additions & 0 deletions validator/client/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byt
}

func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (primitives.Slot, error) {
ctx, span := trace.StartSpan(ctx, "validator.initializeValidatorAndGetHeadSlot")
defer span.End()

ticker := time.NewTicker(backOffPeriod)
defer ticker.Stop()

Expand Down
9 changes: 9 additions & 0 deletions validator/client/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ func (v *validator) SubmitSignedContributionAndProof(ctx context.Context, slot p

// Signs and returns selection proofs per validator for slot and pub key.
func (v *validator) selectionProofs(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, indexRes *ethpb.SyncSubcommitteeIndexResponse, validatorIndex primitives.ValidatorIndex) ([][]byte, error) {
ctx, span := trace.StartSpan(ctx, "validator.selectionProofs")
defer span.End()

selectionProofs := make([][]byte, len(indexRes.Indices))
cfg := params.BeaconConfig()
size := cfg.SyncCommitteeSize
Expand Down Expand Up @@ -231,6 +234,9 @@ func (v *validator) selectionProofs(ctx context.Context, slot primitives.Slot, p

// Signs input slot with domain sync committee selection proof. This is used to create the signature for sync committee selection.
func (v *validator) signSyncSelectionData(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte, index uint64, slot primitives.Slot) (signature []byte, err error) {
ctx, span := trace.StartSpan(ctx, "validator.signSyncSelectionData")
defer span.End()

domain, err := v.domainData(ctx, slots.ToEpoch(slot), params.BeaconConfig().DomainSyncCommitteeSelectionProof[:])
if err != nil {
return nil, err
Expand Down Expand Up @@ -258,6 +264,9 @@ func (v *validator) signSyncSelectionData(ctx context.Context, pubKey [fieldpara

// This returns the signature of validator signing over sync committee contribution and proof object.
func (v *validator) signContributionAndProof(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte, c *ethpb.ContributionAndProof, slot primitives.Slot) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "validator.signContributionAndProof")
defer span.End()

d, err := v.domainData(ctx, slots.ToEpoch(c.Contribution.Slot), params.BeaconConfig().DomainContributionAndProof[:])
if err != nil {
return nil, err
Expand Down
Loading
Loading