From fbaee13ec45ca9335f97fa1a0f1029ef117e0ce6 Mon Sep 17 00:00:00 2001 From: rkapka Date: Thu, 21 Mar 2024 17:11:17 +0900 Subject: [PATCH 1/5] wait groups --- validator/client/beacon-api/duties.go | 256 +++++++++++++++++--------- 1 file changed, 165 insertions(+), 91 deletions(-) diff --git a/validator/client/beacon-api/duties.go b/validator/client/beacon-api/duties.go index 7b635ccc9671..d6110d99d580 100644 --- a/validator/client/beacon-api/duties.go +++ b/validator/client/beacon-api/duties.go @@ -7,11 +7,14 @@ import ( "fmt" "net/url" "strconv" + "sync" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/api/server/structs" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/consensus-types/validator" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" ) @@ -31,37 +34,45 @@ type committeeIndexSlotPair struct { slot primitives.Slot } +type validatorForDuty struct { + pubkey []byte + index primitives.ValidatorIndex + status ethpb.ValidatorStatus +} + func (c beaconApiValidatorClient) getDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) { - all, err := c.multipleValidatorStatus(ctx, ðpb.MultipleValidatorStatusRequest{PublicKeys: in.PublicKeys}) + vals, err := c.getValidatorsForDuties(ctx, in.PublicKeys) if err != nil { - return nil, errors.Wrap(err, "failed to get validator status") - } - known := ðpb.MultipleValidatorStatusResponse{ - PublicKeys: make([][]byte, 0, len(all.PublicKeys)), - Statuses: make([]*ethpb.ValidatorStatusResponse, 0, len(all.Statuses)), - Indices: make([]primitives.ValidatorIndex, 0, len(all.Indices)), - } - for i, status := range all.Statuses { - if status.Status != ethpb.ValidatorStatus_UNKNOWN_STATUS { - known.PublicKeys = append(known.PublicKeys, all.PublicKeys[i]) - known.Statuses = append(known.Statuses, all.Statuses[i]) - known.Indices = append(known.Indices, all.Indices[i]) - } + return nil, errors.Wrap(err, "failed to get validators for duties") } // Sync committees are an Altair feature fetchSyncDuties := in.Epoch >= params.BeaconConfig().AltairForkEpoch - currentEpochDuties, err := c.getDutiesForEpoch(ctx, in.Epoch, known, fetchSyncDuties) - if err != nil { - return nil, errors.Wrapf(err, "failed to get duties for current epoch `%d`", in.Epoch) - } + var wg sync.WaitGroup + errChan := make(chan error, 1) + wg.Add(1) - nextEpochDuties, err := c.getDutiesForEpoch(ctx, in.Epoch+1, known, fetchSyncDuties) + var currentEpochDuties []*ethpb.DutiesResponse_Duty + go func() { + defer wg.Done() + + currentEpochDuties, err = c.getDutiesForEpoch(ctx, in.Epoch, vals, fetchSyncDuties) + if err != nil { + errChan <- errors.Wrapf(err, "failed to get duties for current epoch `%d`", in.Epoch) + } + }() + + nextEpochDuties, err := c.getDutiesForEpoch(ctx, in.Epoch+1, vals, fetchSyncDuties) if err != nil { return nil, errors.Wrapf(err, "failed to get duties for next epoch `%d`", in.Epoch+1) } + wg.Wait() + if err = <-errChan; err != nil { + return nil, err + } + return ðpb.DutiesResponse{ CurrentEpochDuties: currentEpochDuties, NextEpochDuties: nextEpochDuties, @@ -71,103 +82,120 @@ func (c beaconApiValidatorClient) getDuties(ctx context.Context, in *ethpb.Dutie func (c beaconApiValidatorClient) getDutiesForEpoch( ctx context.Context, epoch primitives.Epoch, - multipleValidatorStatus *ethpb.MultipleValidatorStatusResponse, + vals []validatorForDuty, fetchSyncDuties bool, ) ([]*ethpb.DutiesResponse_Duty, error) { - attesterDuties, err := c.dutiesProvider.GetAttesterDuties(ctx, epoch, multipleValidatorStatus.Indices) - if err != nil { - return nil, errors.Wrapf(err, "failed to get attester duties for epoch `%d`", epoch) + indices := make([]primitives.ValidatorIndex, len(vals)) + for i, v := range vals { + indices[i] = v.index } - var syncDuties []*structs.SyncCommitteeDuty + goroutineCount := 2 if fetchSyncDuties { - if syncDuties, err = c.dutiesProvider.GetSyncDuties(ctx, epoch, multipleValidatorStatus.Indices); err != nil { - return nil, errors.Wrapf(err, "failed to get sync duties for epoch `%d`", epoch) - } - } - - var proposerDuties []*structs.ProposerDuty - if proposerDuties, err = c.dutiesProvider.GetProposerDuties(ctx, epoch); err != nil { - return nil, errors.Wrapf(err, "failed to get proposer duties for epoch `%d`", epoch) - } - - committees, err := c.dutiesProvider.GetCommittees(ctx, epoch) - if err != nil { - return nil, errors.Wrapf(err, "failed to get committees for epoch `%d`", epoch) - } - slotCommittees := make(map[string]uint64) - for _, c := range committees { - n, ok := slotCommittees[c.Slot] - if !ok { - n = 0 - } - slotCommittees[c.Slot] = n + 1 + goroutineCount = 3 } + var wg sync.WaitGroup + errChan := make(chan error, goroutineCount) + wg.Add(goroutineCount) // Mapping from a validator index to its attesting committee's index and slot attesterDutiesMapping := make(map[primitives.ValidatorIndex]committeeIndexSlotPair) - for _, attesterDuty := range attesterDuties { - validatorIndex, err := strconv.ParseUint(attesterDuty.ValidatorIndex, 10, 64) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse attester validator index `%s`", attesterDuty.ValidatorIndex) - } + go func() { + defer wg.Done() - slot, err := strconv.ParseUint(attesterDuty.Slot, 10, 64) + attesterDuties, err := c.dutiesProvider.GetAttesterDuties(ctx, epoch, indices) if err != nil { - return nil, errors.Wrapf(err, "failed to parse attester slot `%s`", attesterDuty.Slot) + errChan <- errors.Wrapf(err, "failed to get attester duties for epoch `%d`", epoch) } - committeeIndex, err := strconv.ParseUint(attesterDuty.CommitteeIndex, 10, 64) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse attester committee index `%s`", attesterDuty.CommitteeIndex) + for _, attesterDuty := range attesterDuties { + validatorIndex, err := strconv.ParseUint(attesterDuty.ValidatorIndex, 10, 64) + if err != nil { + errChan <- errors.Wrapf(err, "failed to parse attester validator index `%s`", attesterDuty.ValidatorIndex) + } + slot, err := strconv.ParseUint(attesterDuty.Slot, 10, 64) + if err != nil { + errChan <- errors.Wrapf(err, "failed to parse attester slot `%s`", attesterDuty.Slot) + } + committeeIndex, err := strconv.ParseUint(attesterDuty.CommitteeIndex, 10, 64) + if err != nil { + errChan <- errors.Wrapf(err, "failed to parse attester committee index `%s`", attesterDuty.CommitteeIndex) + } + attesterDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = committeeIndexSlotPair{ + slot: primitives.Slot(slot), + committeeIndex: primitives.CommitteeIndex(committeeIndex), + } } + }() - attesterDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = committeeIndexSlotPair{ - slot: primitives.Slot(slot), - committeeIndex: primitives.CommitteeIndex(committeeIndex), - } + // Set containing all validator indices that are part of a sync committee for this epoch + syncDutiesMapping := make(map[primitives.ValidatorIndex]bool) + if fetchSyncDuties { + go func() { + defer wg.Done() + + syncDuties, err := c.dutiesProvider.GetSyncDuties(ctx, epoch, indices) + if err != nil { + errChan <- errors.Wrapf(err, "failed to get sync duties for epoch `%d`", epoch) + } + + for _, syncDuty := range syncDuties { + validatorIndex, err := strconv.ParseUint(syncDuty.ValidatorIndex, 10, 64) + if err != nil { + errChan <- errors.Wrapf(err, "failed to parse sync validator index `%s`", syncDuty.ValidatorIndex) + } + syncDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = true + } + }() } // Mapping from a validator index to its proposal slot proposerDutySlots := make(map[primitives.ValidatorIndex][]primitives.Slot) - for _, proposerDuty := range proposerDuties { - validatorIndex, err := strconv.ParseUint(proposerDuty.ValidatorIndex, 10, 64) + go func() { + defer wg.Done() + + proposerDuties, err := c.dutiesProvider.GetProposerDuties(ctx, epoch) if err != nil { - return nil, errors.Wrapf(err, "failed to parse proposer validator index `%s`", proposerDuty.ValidatorIndex) + errChan <- errors.Wrapf(err, "failed to get proposer duties for epoch `%d`", epoch) } - slot, err := strconv.ParseUint(proposerDuty.Slot, 10, 64) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse proposer slot `%s`", proposerDuty.Slot) + for _, proposerDuty := range proposerDuties { + validatorIndex, err := strconv.ParseUint(proposerDuty.ValidatorIndex, 10, 64) + if err != nil { + errChan <- errors.Wrapf(err, "failed to parse proposer validator index `%s`", proposerDuty.ValidatorIndex) + } + slot, err := strconv.ParseUint(proposerDuty.Slot, 10, 64) + if err != nil { + errChan <- errors.Wrapf(err, "failed to parse proposer slot `%s`", proposerDuty.Slot) + } + proposerDutySlots[primitives.ValidatorIndex(validatorIndex)] = append(proposerDutySlots[primitives.ValidatorIndex(validatorIndex)], primitives.Slot(slot)) } + }() - proposerDutySlots[primitives.ValidatorIndex(validatorIndex)] = append(proposerDutySlots[primitives.ValidatorIndex(validatorIndex)], primitives.Slot(slot)) + // Mapping from the {committeeIndex, slot} to each of the committee's validator indices + committeeMapping := make(map[committeeIndexSlotPair][]primitives.ValidatorIndex) + committees, err := c.dutiesProvider.GetCommittees(ctx, epoch) + if err != nil { + return nil, errors.Wrapf(err, "failed to get committees for epoch `%d`", epoch) } - - // Set containing all validator indices that are part of a sync committee for this epoch - syncDutiesMapping := make(map[primitives.ValidatorIndex]bool) - for _, syncDuty := range syncDuties { - validatorIndex, err := strconv.ParseUint(syncDuty.ValidatorIndex, 10, 64) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse sync validator index `%s`", syncDuty.ValidatorIndex) + slotCommittees := make(map[string]uint64) + for _, c := range committees { + n, ok := slotCommittees[c.Slot] + if !ok { + n = 0 } - - syncDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = true + slotCommittees[c.Slot] = n + 1 } - // Mapping from the {committeeIndex, slot} to each of the committee's validator indices - committeeMapping := make(map[committeeIndexSlotPair][]primitives.ValidatorIndex) for _, committee := range committees { committeeIndex, err := strconv.ParseUint(committee.Index, 10, 64) if err != nil { return nil, errors.Wrapf(err, "failed to parse committee index `%s`", committee.Index) } - slot, err := strconv.ParseUint(committee.Slot, 10, 64) if err != nil { return nil, errors.Wrapf(err, "failed to parse slot `%s`", committee.Slot) } - validatorIndices := make([]primitives.ValidatorIndex, len(committee.Validators)) for index, validatorIndexString := range committee.Validators { validatorIndex, err := strconv.ParseUint(validatorIndexString, 10, 64) @@ -176,7 +204,6 @@ func (c beaconApiValidatorClient) getDutiesForEpoch( } validatorIndices[index] = primitives.ValidatorIndex(validatorIndex) } - key := committeeIndexSlotPair{ committeeIndex: primitives.CommitteeIndex(committeeIndex), slot: primitives.Slot(slot), @@ -184,16 +211,18 @@ func (c beaconApiValidatorClient) getDutiesForEpoch( committeeMapping[key] = validatorIndices } - duties := make([]*ethpb.DutiesResponse_Duty, len(multipleValidatorStatus.Statuses)) - for index, validatorStatus := range multipleValidatorStatus.Statuses { - validatorIndex := multipleValidatorStatus.Indices[index] - pubkey := multipleValidatorStatus.PublicKeys[index] + wg.Wait() + if err := <-errChan; err != nil { + return nil, err + } + duties := make([]*ethpb.DutiesResponse_Duty, len(vals)) + for i, v := range vals { var attesterSlot primitives.Slot var committeeIndex primitives.CommitteeIndex var committeeValidatorIndices []primitives.ValidatorIndex - if committeeMappingKey, ok := attesterDutiesMapping[validatorIndex]; ok { + if committeeMappingKey, ok := attesterDutiesMapping[v.index]; ok { committeeIndex = committeeMappingKey.committeeIndex attesterSlot = committeeMappingKey.slot @@ -202,15 +231,15 @@ func (c beaconApiValidatorClient) getDutiesForEpoch( } } - duties[index] = ðpb.DutiesResponse_Duty{ + duties[i] = ðpb.DutiesResponse_Duty{ Committee: committeeValidatorIndices, CommitteeIndex: committeeIndex, AttesterSlot: attesterSlot, - ProposerSlots: proposerDutySlots[validatorIndex], - PublicKey: pubkey, - Status: validatorStatus.Status, - ValidatorIndex: validatorIndex, - IsSyncCommittee: syncDutiesMapping[validatorIndex], + ProposerSlots: proposerDutySlots[v.index], + PublicKey: v.pubkey, + Status: v.status, + ValidatorIndex: v.index, + IsSyncCommittee: syncDutiesMapping[v.index], CommitteesAtSlot: slotCommittees[strconv.FormatUint(uint64(attesterSlot), 10)], } } @@ -218,6 +247,51 @@ func (c beaconApiValidatorClient) getDutiesForEpoch( return duties, nil } +func (c *beaconApiValidatorClient) getValidatorsForDuties(ctx context.Context, pubkeys [][]byte) ([]validatorForDuty, error) { + vals := make([]validatorForDuty, 0, len(pubkeys)) + stringPubkeysToPubkeys := make(map[string][]byte, len(pubkeys)) + stringPubkeys := make([]string, len(pubkeys)) + + for i, pk := range pubkeys { + stringPk := hexutil.Encode(pk) + stringPubkeysToPubkeys[stringPk] = pk + stringPubkeys[i] = stringPk + } + + statusesWithDuties := []string{validator.ActiveOngoing.String(), validator.ActiveExiting.String()} + stateValidatorsResponse, err := c.stateValidatorsProvider.GetStateValidators(ctx, stringPubkeys, nil, statusesWithDuties) + if err != nil { + return nil, errors.Wrap(err, "failed to get state validators") + } + + for _, validatorContainer := range stateValidatorsResponse.Data { + val := validatorForDuty{} + + validatorIndex, err := strconv.ParseUint(validatorContainer.Index, 10, 64) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse validator index %s", validatorContainer.Index) + } + val.index = primitives.ValidatorIndex(validatorIndex) + + stringPubkey := validatorContainer.Validator.Pubkey + pubkey, ok := stringPubkeysToPubkeys[stringPubkey] + if !ok { + return nil, errors.Wrapf(err, "returned public key %s not requested", stringPubkey) + } + val.pubkey = pubkey + + status, ok := beaconAPITogRPCValidatorStatus[validatorContainer.Status] + if !ok { + return nil, errors.New("invalid validator status " + validatorContainer.Status) + } + val.status = status + + vals = append(vals, val) + } + + return vals, nil +} + // GetCommittees retrieves the committees for the given epoch func (c beaconApiDutiesProvider) GetCommittees(ctx context.Context, epoch primitives.Epoch) ([]*structs.Committee, error) { committeeParams := url.Values{} From 7e6ca886450bba614ed69449f497b3fda8a773e7 Mon Sep 17 00:00:00 2001 From: rkapka Date: Fri, 22 Mar 2024 15:35:07 +0900 Subject: [PATCH 2/5] errgroup --- validator/client/beacon-api/duties.go | 71 +++++++++++---------------- 1 file changed, 29 insertions(+), 42 deletions(-) diff --git a/validator/client/beacon-api/duties.go b/validator/client/beacon-api/duties.go index d6110d99d580..db162d003b00 100644 --- a/validator/client/beacon-api/duties.go +++ b/validator/client/beacon-api/duties.go @@ -7,7 +7,6 @@ import ( "fmt" "net/url" "strconv" - "sync" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" @@ -16,6 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/consensus-types/validator" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "golang.org/x/sync/errgroup" ) type dutiesProvider interface { @@ -49,27 +49,23 @@ func (c beaconApiValidatorClient) getDuties(ctx context.Context, in *ethpb.Dutie // Sync committees are an Altair feature fetchSyncDuties := in.Epoch >= params.BeaconConfig().AltairForkEpoch - var wg sync.WaitGroup - errChan := make(chan error, 1) - wg.Add(1) + var wg errgroup.Group var currentEpochDuties []*ethpb.DutiesResponse_Duty - go func() { - defer wg.Done() - + wg.Go(func() error { currentEpochDuties, err = c.getDutiesForEpoch(ctx, in.Epoch, vals, fetchSyncDuties) if err != nil { - errChan <- errors.Wrapf(err, "failed to get duties for current epoch `%d`", in.Epoch) + return errors.Wrapf(err, "failed to get duties for current epoch `%d`", in.Epoch) } - }() + return nil + }) nextEpochDuties, err := c.getDutiesForEpoch(ctx, in.Epoch+1, vals, fetchSyncDuties) if err != nil { return nil, errors.Wrapf(err, "failed to get duties for next epoch `%d`", in.Epoch+1) } - wg.Wait() - if err = <-errChan; err != nil { + if err = wg.Wait(); err != nil { return nil, err } @@ -90,87 +86,79 @@ func (c beaconApiValidatorClient) getDutiesForEpoch( indices[i] = v.index } - goroutineCount := 2 - if fetchSyncDuties { - goroutineCount = 3 - } - var wg sync.WaitGroup - errChan := make(chan error, goroutineCount) - wg.Add(goroutineCount) + var wg errgroup.Group // Mapping from a validator index to its attesting committee's index and slot attesterDutiesMapping := make(map[primitives.ValidatorIndex]committeeIndexSlotPair) - go func() { - defer wg.Done() - + wg.Go(func() error { attesterDuties, err := c.dutiesProvider.GetAttesterDuties(ctx, epoch, indices) if err != nil { - errChan <- errors.Wrapf(err, "failed to get attester duties for epoch `%d`", epoch) + return errors.Wrapf(err, "failed to get attester duties for epoch `%d`", epoch) } for _, attesterDuty := range attesterDuties { validatorIndex, err := strconv.ParseUint(attesterDuty.ValidatorIndex, 10, 64) if err != nil { - errChan <- errors.Wrapf(err, "failed to parse attester validator index `%s`", attesterDuty.ValidatorIndex) + return errors.Wrapf(err, "failed to parse attester validator index `%s`", attesterDuty.ValidatorIndex) } slot, err := strconv.ParseUint(attesterDuty.Slot, 10, 64) if err != nil { - errChan <- errors.Wrapf(err, "failed to parse attester slot `%s`", attesterDuty.Slot) + return errors.Wrapf(err, "failed to parse attester slot `%s`", attesterDuty.Slot) } committeeIndex, err := strconv.ParseUint(attesterDuty.CommitteeIndex, 10, 64) if err != nil { - errChan <- errors.Wrapf(err, "failed to parse attester committee index `%s`", attesterDuty.CommitteeIndex) + return errors.Wrapf(err, "failed to parse attester committee index `%s`", attesterDuty.CommitteeIndex) } attesterDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = committeeIndexSlotPair{ slot: primitives.Slot(slot), committeeIndex: primitives.CommitteeIndex(committeeIndex), } } - }() + return nil + }) // Set containing all validator indices that are part of a sync committee for this epoch syncDutiesMapping := make(map[primitives.ValidatorIndex]bool) if fetchSyncDuties { - go func() { - defer wg.Done() - + wg.Go(func() error { syncDuties, err := c.dutiesProvider.GetSyncDuties(ctx, epoch, indices) if err != nil { - errChan <- errors.Wrapf(err, "failed to get sync duties for epoch `%d`", epoch) + return errors.Wrapf(err, "failed to get sync duties for epoch `%d`", epoch) } for _, syncDuty := range syncDuties { validatorIndex, err := strconv.ParseUint(syncDuty.ValidatorIndex, 10, 64) if err != nil { - errChan <- errors.Wrapf(err, "failed to parse sync validator index `%s`", syncDuty.ValidatorIndex) + return errors.Wrapf(err, "failed to parse sync validator index `%s`", syncDuty.ValidatorIndex) } syncDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = true } - }() + return nil + }) } // Mapping from a validator index to its proposal slot proposerDutySlots := make(map[primitives.ValidatorIndex][]primitives.Slot) - go func() { - defer wg.Done() - + wg.Go(func() error { proposerDuties, err := c.dutiesProvider.GetProposerDuties(ctx, epoch) if err != nil { - errChan <- errors.Wrapf(err, "failed to get proposer duties for epoch `%d`", epoch) + return errors.Wrapf(err, "failed to get proposer duties for epoch `%d`", epoch) } for _, proposerDuty := range proposerDuties { validatorIndex, err := strconv.ParseUint(proposerDuty.ValidatorIndex, 10, 64) if err != nil { - errChan <- errors.Wrapf(err, "failed to parse proposer validator index `%s`", proposerDuty.ValidatorIndex) + return errors.Wrapf(err, "failed to parse proposer validator index `%s`", proposerDuty.ValidatorIndex) } slot, err := strconv.ParseUint(proposerDuty.Slot, 10, 64) if err != nil { - errChan <- errors.Wrapf(err, "failed to parse proposer slot `%s`", proposerDuty.Slot) + return errors.Wrapf(err, "failed to parse proposer slot `%s`", proposerDuty.Slot) } - proposerDutySlots[primitives.ValidatorIndex(validatorIndex)] = append(proposerDutySlots[primitives.ValidatorIndex(validatorIndex)], primitives.Slot(slot)) + proposerDutySlots[primitives.ValidatorIndex(validatorIndex)] = + append(proposerDutySlots[primitives.ValidatorIndex(validatorIndex)], primitives.Slot(slot)) } - }() + return nil + }) // Mapping from the {committeeIndex, slot} to each of the committee's validator indices committeeMapping := make(map[committeeIndexSlotPair][]primitives.ValidatorIndex) @@ -211,8 +199,7 @@ func (c beaconApiValidatorClient) getDutiesForEpoch( committeeMapping[key] = validatorIndices } - wg.Wait() - if err := <-errChan; err != nil { + if err = wg.Wait(); err != nil { return nil, err } From bf1ffbf33df4f3a299316e5a5203e64a856113b1 Mon Sep 17 00:00:00 2001 From: rkapka Date: Fri, 22 Mar 2024 15:35:13 +0900 Subject: [PATCH 3/5] tests --- validator/client/beacon-api/duties_test.go | 215 ++++++++------------- 1 file changed, 79 insertions(+), 136 deletions(-) diff --git a/validator/client/beacon-api/duties_test.go b/validator/client/beacon-api/duties_test.go index 896f2e02e3fb..e29aab292ab2 100644 --- a/validator/client/beacon-api/duties_test.go +++ b/validator/client/beacon-api/duties_test.go @@ -9,11 +9,8 @@ import ( "strconv" "testing" - "github.com/prysmaticlabs/prysm/v5/api/server/structs" - validatormock "github.com/prysmaticlabs/prysm/v5/testing/validator-mock" - "github.com/prysmaticlabs/prysm/v5/validator/client/iface" - "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/prysmaticlabs/prysm/v5/api/server/structs" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" @@ -541,7 +538,6 @@ func TestGetDutiesForEpoch_Error(t *testing.T) { { name: "get proposer duties failed", expectedError: "failed to get proposer duties for epoch `1`: foo error", - fetchAttesterDutiesError: nil, fetchProposerDutiesError: errors.New("foo error"), }, { @@ -720,28 +716,20 @@ func TestGetDutiesForEpoch_Error(t *testing.T) { testCase.fetchCommitteesError, ).AnyTimes() + vals := make([]validatorForDuty, len(pubkeys)) + for i := 0; i < len(pubkeys); i++ { + vals[i] = validatorForDuty{ + pubkey: pubkeys[i], + index: validatorIndices[i], + status: ethpb.ValidatorStatus_ACTIVE, + } + } + validatorClient := &beaconApiValidatorClient{dutiesProvider: dutiesProvider} _, err := validatorClient.getDutiesForEpoch( ctx, epoch, - ðpb.MultipleValidatorStatusResponse{ - PublicKeys: pubkeys, - Indices: validatorIndices, - Statuses: []*ethpb.ValidatorStatusResponse{ - {Status: ethpb.ValidatorStatus_UNKNOWN_STATUS}, - {Status: ethpb.ValidatorStatus_DEPOSITED}, - {Status: ethpb.ValidatorStatus_PENDING}, - {Status: ethpb.ValidatorStatus_ACTIVE}, - {Status: ethpb.ValidatorStatus_EXITING}, - {Status: ethpb.ValidatorStatus_SLASHING}, - {Status: ethpb.ValidatorStatus_EXITED}, - {Status: ethpb.ValidatorStatus_INVALID}, - {Status: ethpb.ValidatorStatus_PARTIALLY_DEPOSITED}, - {Status: ethpb.ValidatorStatus_UNKNOWN_STATUS}, - {Status: ethpb.ValidatorStatus_DEPOSITED}, - {Status: ethpb.ValidatorStatus_PENDING}, - }, - }, + vals, true, ) assert.ErrorContains(t, testCase.expectedError, err) @@ -773,40 +761,6 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { committeeSlots := []primitives.Slot{28, 29, 30} proposerSlots := []primitives.Slot{31, 32, 33, 34, 35, 36, 37, 38} - statuses := []ethpb.ValidatorStatus{ - ethpb.ValidatorStatus_UNKNOWN_STATUS, - ethpb.ValidatorStatus_DEPOSITED, - ethpb.ValidatorStatus_PENDING, - ethpb.ValidatorStatus_ACTIVE, - ethpb.ValidatorStatus_EXITING, - ethpb.ValidatorStatus_SLASHING, - ethpb.ValidatorStatus_EXITED, - ethpb.ValidatorStatus_INVALID, - ethpb.ValidatorStatus_PARTIALLY_DEPOSITED, - ethpb.ValidatorStatus_UNKNOWN_STATUS, - ethpb.ValidatorStatus_DEPOSITED, - ethpb.ValidatorStatus_PENDING, - } - - multipleValidatorStatus := ðpb.MultipleValidatorStatusResponse{ - PublicKeys: pubkeys, - Indices: validatorIndices, - Statuses: []*ethpb.ValidatorStatusResponse{ - {Status: statuses[0]}, - {Status: statuses[1]}, - {Status: statuses[2]}, - {Status: statuses[3]}, - {Status: statuses[4]}, - {Status: statuses[5]}, - {Status: statuses[6]}, - {Status: statuses[7]}, - {Status: statuses[8]}, - {Status: statuses[9]}, - {Status: statuses[10]}, - {Status: statuses[11]}, - }, - } - ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -824,7 +778,7 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { dutiesProvider.EXPECT().GetAttesterDuties( ctx, epoch, - multipleValidatorStatus.Indices, + validatorIndices, ).Return( generateValidAttesterDuties(pubkeys, validatorIndices, committeeIndices, committeeSlots), nil, @@ -842,7 +796,7 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { dutiesProvider.EXPECT().GetSyncDuties( ctx, epoch, - multipleValidatorStatus.Indices, + validatorIndices, ).Return( generateValidSyncDuties(pubkeys, validatorIndices), nil, @@ -883,7 +837,7 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { CommitteeIndex: committeeIndices[0], AttesterSlot: committeeSlots[0], PublicKey: pubkeys[0], - Status: statuses[0], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[0], CommitteesAtSlot: 1, }, @@ -895,7 +849,7 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { CommitteeIndex: committeeIndices[0], AttesterSlot: committeeSlots[0], PublicKey: pubkeys[1], - Status: statuses[1], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[1], CommitteesAtSlot: 1, }, @@ -907,7 +861,7 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { CommitteeIndex: committeeIndices[1], AttesterSlot: committeeSlots[1], PublicKey: pubkeys[2], - Status: statuses[2], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[2], CommitteesAtSlot: 1, }, @@ -919,7 +873,7 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { CommitteeIndex: committeeIndices[1], AttesterSlot: committeeSlots[1], PublicKey: pubkeys[3], - Status: statuses[3], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[3], CommitteesAtSlot: 1, }, @@ -931,7 +885,7 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { CommitteeIndex: committeeIndices[2], AttesterSlot: committeeSlots[2], PublicKey: pubkeys[4], - Status: statuses[4], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[4], ProposerSlots: expectedProposerSlots1, CommitteesAtSlot: 1, @@ -944,7 +898,7 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { CommitteeIndex: committeeIndices[2], AttesterSlot: committeeSlots[2], PublicKey: pubkeys[5], - Status: statuses[5], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[5], ProposerSlots: expectedProposerSlots2, IsSyncCommittee: testCase.fetchSyncDuties, @@ -952,47 +906,55 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { }, { PublicKey: pubkeys[6], - Status: statuses[6], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[6], ProposerSlots: expectedProposerSlots3, IsSyncCommittee: testCase.fetchSyncDuties, }, { PublicKey: pubkeys[7], - Status: statuses[7], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[7], ProposerSlots: expectedProposerSlots4, IsSyncCommittee: testCase.fetchSyncDuties, }, { PublicKey: pubkeys[8], - Status: statuses[8], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[8], IsSyncCommittee: testCase.fetchSyncDuties, }, { PublicKey: pubkeys[9], - Status: statuses[9], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[9], IsSyncCommittee: testCase.fetchSyncDuties, }, { PublicKey: pubkeys[10], - Status: statuses[10], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[10], }, { PublicKey: pubkeys[11], - Status: statuses[11], + Status: ethpb.ValidatorStatus_ACTIVE, ValidatorIndex: validatorIndices[11], }, } validatorClient := &beaconApiValidatorClient{dutiesProvider: dutiesProvider} + vals := make([]validatorForDuty, len(pubkeys)) + for i := 0; i < len(pubkeys); i++ { + vals[i] = validatorForDuty{ + pubkey: pubkeys[i], + index: validatorIndices[i], + status: ethpb.ValidatorStatus_ACTIVE, + } + } duties, err := validatorClient.getDutiesForEpoch( ctx, epoch, - multipleValidatorStatus, + vals, testCase.fetchSyncDuties, ) require.NoError(t, err) @@ -1018,41 +980,24 @@ func TestGetDuties_Valid(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - statuses := []ethpb.ValidatorStatus{ - ethpb.ValidatorStatus_DEPOSITED, - ethpb.ValidatorStatus_PENDING, - ethpb.ValidatorStatus_ACTIVE, - ethpb.ValidatorStatus_EXITING, - ethpb.ValidatorStatus_SLASHING, - ethpb.ValidatorStatus_EXITED, - ethpb.ValidatorStatus_EXITED, - ethpb.ValidatorStatus_EXITED, - ethpb.ValidatorStatus_EXITED, - ethpb.ValidatorStatus_DEPOSITED, - ethpb.ValidatorStatus_PENDING, - ethpb.ValidatorStatus_ACTIVE, - } - pubkeys := make([][]byte, len(statuses)) - validatorIndices := make([]primitives.ValidatorIndex, len(statuses)) - for i := range statuses { + valCount := 12 + pubkeys := make([][]byte, valCount) + validatorIndices := make([]primitives.ValidatorIndex, valCount) + vals := make([]validatorForDuty, valCount) + for i := 0; i < valCount; i++ { pubkeys[i] = []byte(strconv.Itoa(i)) validatorIndices[i] = primitives.ValidatorIndex(i) + vals[i] = validatorForDuty{ + pubkey: pubkeys[i], + index: validatorIndices[i], + status: ethpb.ValidatorStatus_ACTIVE, + } } committeeIndices := []primitives.CommitteeIndex{25, 26, 27} committeeSlots := []primitives.Slot{28, 29, 30} proposerSlots := []primitives.Slot{31, 32, 33, 34, 35, 36, 37, 38} - statusResps := make([]*ethpb.ValidatorStatusResponse, len(statuses)) - for i, s := range statuses { - statusResps[i] = ðpb.ValidatorStatusResponse{Status: s} - } - multipleValidatorStatus := ðpb.MultipleValidatorStatusResponse{ - PublicKeys: pubkeys, - Indices: validatorIndices, - Statuses: statusResps, - } - ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1070,7 +1015,7 @@ func TestGetDuties_Valid(t *testing.T) { dutiesProvider.EXPECT().GetAttesterDuties( ctx, testCase.epoch, - multipleValidatorStatus.Indices, + validatorIndices, ).Return( generateValidAttesterDuties(pubkeys, validatorIndices, committeeIndices, committeeSlots), nil, @@ -1089,7 +1034,7 @@ func TestGetDuties_Valid(t *testing.T) { dutiesProvider.EXPECT().GetSyncDuties( ctx, testCase.epoch, - multipleValidatorStatus.Indices, + validatorIndices, ).Return( generateValidSyncDuties(pubkeys, validatorIndices), nil, @@ -1143,7 +1088,7 @@ func TestGetDuties_Valid(t *testing.T) { Data: []*structs.ValidatorContainer{ { Index: strconv.FormatUint(uint64(validatorIndices[0]), 10), - Status: "pending_initialized", + Status: "active_ongoing", Validator: &structs.Validator{ Pubkey: hexutil.Encode(pubkeys[0]), ActivationEpoch: strconv.FormatUint(uint64(testCase.epoch), 10), @@ -1151,7 +1096,7 @@ func TestGetDuties_Valid(t *testing.T) { }, { Index: strconv.FormatUint(uint64(validatorIndices[1]), 10), - Status: "pending_queued", + Status: "active_ongoing", Validator: &structs.Validator{ Pubkey: hexutil.Encode(pubkeys[1]), ActivationEpoch: strconv.FormatUint(uint64(testCase.epoch), 10), @@ -1167,7 +1112,7 @@ func TestGetDuties_Valid(t *testing.T) { }, { Index: strconv.FormatUint(uint64(validatorIndices[3]), 10), - Status: "active_exiting", + Status: "active_ongoing", Validator: &structs.Validator{ Pubkey: hexutil.Encode(pubkeys[3]), ActivationEpoch: strconv.FormatUint(uint64(testCase.epoch), 10), @@ -1175,7 +1120,7 @@ func TestGetDuties_Valid(t *testing.T) { }, { Index: strconv.FormatUint(uint64(validatorIndices[4]), 10), - Status: "active_slashed", + Status: "active_ongoing", Validator: &structs.Validator{ Pubkey: hexutil.Encode(pubkeys[4]), ActivationEpoch: strconv.FormatUint(uint64(testCase.epoch), 10), @@ -1183,7 +1128,7 @@ func TestGetDuties_Valid(t *testing.T) { }, { Index: strconv.FormatUint(uint64(validatorIndices[5]), 10), - Status: "exited_unslashed", + Status: "active_ongoing", Validator: &structs.Validator{ Pubkey: hexutil.Encode(pubkeys[5]), ActivationEpoch: strconv.FormatUint(uint64(testCase.epoch), 10), @@ -1191,7 +1136,7 @@ func TestGetDuties_Valid(t *testing.T) { }, { Index: strconv.FormatUint(uint64(validatorIndices[6]), 10), - Status: "exited_slashed", + Status: "active_ongoing", Validator: &structs.Validator{ Pubkey: hexutil.Encode(pubkeys[6]), ActivationEpoch: strconv.FormatUint(uint64(testCase.epoch), 10), @@ -1199,7 +1144,7 @@ func TestGetDuties_Valid(t *testing.T) { }, { Index: strconv.FormatUint(uint64(validatorIndices[7]), 10), - Status: "withdrawal_possible", + Status: "active_ongoing", Validator: &structs.Validator{ Pubkey: hexutil.Encode(pubkeys[7]), ActivationEpoch: strconv.FormatUint(uint64(testCase.epoch), 10), @@ -1207,7 +1152,7 @@ func TestGetDuties_Valid(t *testing.T) { }, { Index: strconv.FormatUint(uint64(validatorIndices[8]), 10), - Status: "withdrawal_done", + Status: "active_ongoing", Validator: &structs.Validator{ Pubkey: hexutil.Encode(pubkeys[8]), ActivationEpoch: strconv.FormatUint(uint64(testCase.epoch), 10), @@ -1215,7 +1160,7 @@ func TestGetDuties_Valid(t *testing.T) { }, { Index: strconv.FormatUint(uint64(validatorIndices[9]), 10), - Status: "pending_initialized", + Status: "active_ongoing", Validator: &structs.Validator{ Pubkey: hexutil.Encode(pubkeys[9]), ActivationEpoch: strconv.FormatUint(uint64(testCase.epoch), 10), @@ -1223,7 +1168,7 @@ func TestGetDuties_Valid(t *testing.T) { }, { Index: strconv.FormatUint(uint64(validatorIndices[10]), 10), - Status: "pending_queued", + Status: "active_ongoing", Validator: &structs.Validator{ Pubkey: hexutil.Encode(pubkeys[10]), ActivationEpoch: strconv.FormatUint(uint64(testCase.epoch), 10), @@ -1242,27 +1187,16 @@ func TestGetDuties_Valid(t *testing.T) { nil, ).MinTimes(1) - prysmBeaconChainClient := validatormock.NewMockPrysmBeaconChainClient(ctrl) - prysmBeaconChainClient.EXPECT().GetValidatorCount( - ctx, - gomock.Any(), - gomock.Any(), - ).Return( - nil, - iface.ErrNotSupported, - ).MinTimes(1) - // Make sure that our values are equal to what would be returned by calling getDutiesForEpoch individually validatorClient := &beaconApiValidatorClient{ dutiesProvider: dutiesProvider, stateValidatorsProvider: stateValidatorsProvider, - prysmBeaconChainCLient: prysmBeaconChainClient, } expectedCurrentEpochDuties, err := validatorClient.getDutiesForEpoch( ctx, testCase.epoch, - multipleValidatorStatus, + vals, fetchSyncDuties, ) require.NoError(t, err) @@ -1270,7 +1204,7 @@ func TestGetDuties_Valid(t *testing.T) { expectedNextEpochDuties, err := validatorClient.getDutiesForEpoch( ctx, testCase.epoch+1, - multipleValidatorStatus, + vals, fetchSyncDuties, ) require.NoError(t, err) @@ -1291,7 +1225,7 @@ func TestGetDuties_Valid(t *testing.T) { } } -func TestGetDuties_GetValidatorStatusFailed(t *testing.T) { +func TestGetDuties_GetStateValidatorsFailed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1316,7 +1250,7 @@ func TestGetDuties_GetValidatorStatusFailed(t *testing.T) { Epoch: 1, PublicKeys: [][]byte{}, }) - assert.ErrorContains(t, "failed to get validator status", err) + assert.ErrorContains(t, "failed to get state validators", err) assert.ErrorContains(t, "foo error", err) } @@ -1325,6 +1259,7 @@ func TestGetDuties_GetDutiesForEpochFailed(t *testing.T) { defer ctrl.Finish() ctx := context.Background() + pubkey := []byte{1, 2, 3} stateValidatorsProvider := mock.NewMockStateValidatorsProvider(ctrl) stateValidatorsProvider.EXPECT().GetStateValidators( @@ -1334,7 +1269,13 @@ func TestGetDuties_GetDutiesForEpochFailed(t *testing.T) { gomock.Any(), ).Return( &structs.GetValidatorsResponse{ - Data: []*structs.ValidatorContainer{}, + Data: []*structs.ValidatorContainer{{ + Index: "0", + Status: "active_ongoing", + Validator: &structs.Validator{ + Pubkey: hexutil.Encode(pubkey), + }, + }}, }, nil, ).Times(1) @@ -1348,26 +1289,28 @@ func TestGetDuties_GetDutiesForEpochFailed(t *testing.T) { nil, errors.New("foo error"), ).Times(1) - - prysmBeaconChainClient := validatormock.NewMockPrysmBeaconChainClient(ctrl) - prysmBeaconChainClient.EXPECT().GetValidatorCount( + dutiesProvider.EXPECT().GetAttesterDuties( + ctx, + primitives.Epoch(2), + gomock.Any(), + ).Times(1) + dutiesProvider.EXPECT().GetProposerDuties( ctx, gomock.Any(), + ).Times(2) + dutiesProvider.EXPECT().GetCommittees( + ctx, gomock.Any(), - ).Return( - nil, - iface.ErrNotSupported, - ).MinTimes(1) + ).Times(2) validatorClient := &beaconApiValidatorClient{ stateValidatorsProvider: stateValidatorsProvider, dutiesProvider: dutiesProvider, - prysmBeaconChainCLient: prysmBeaconChainClient, } _, err := validatorClient.getDuties(ctx, ðpb.DutiesRequest{ Epoch: 1, - PublicKeys: [][]byte{}, + PublicKeys: [][]byte{pubkey}, }) assert.ErrorContains(t, "failed to get duties for current epoch `1`", err) assert.ErrorContains(t, "foo error", err) From c3a32716081269e98b76d6969342449c92453d51 Mon Sep 17 00:00:00 2001 From: rkapka Date: Fri, 22 Mar 2024 15:35:18 +0900 Subject: [PATCH 4/5] bzl --- validator/client/beacon-api/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/client/beacon-api/BUILD.bazel b/validator/client/beacon-api/BUILD.bazel index f42373d18cc7..dfce02f40667 100644 --- a/validator/client/beacon-api/BUILD.bazel +++ b/validator/client/beacon-api/BUILD.bazel @@ -66,6 +66,7 @@ go_library( "@com_github_sirupsen_logrus//:go_default_library", "@org_golang_google_grpc//:go_default_library", "@org_golang_google_protobuf//types/known/timestamppb:go_default_library", + "@org_golang_x_sync//errgroup:go_default_library", ], ) @@ -129,7 +130,6 @@ go_test( "//proto/prysm/v1alpha1:go_default_library", "//testing/assert:go_default_library", "//testing/require:go_default_library", - "//testing/validator-mock:go_default_library", "//time/slots:go_default_library", "//validator/client/beacon-api/mock:go_default_library", "//validator/client/beacon-api/test-helpers:go_default_library", From 6e068638572dd1907d1a5a9649cb49e7cf7a147d Mon Sep 17 00:00:00 2001 From: rkapka Date: Fri, 22 Mar 2024 18:29:27 +0900 Subject: [PATCH 5/5] review --- validator/client/beacon-api/duties.go | 40 ++++++++++++++++----------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/validator/client/beacon-api/duties.go b/validator/client/beacon-api/duties.go index db162d003b00..881040445668 100644 --- a/validator/client/beacon-api/duties.go +++ b/validator/client/beacon-api/duties.go @@ -49,23 +49,24 @@ func (c beaconApiValidatorClient) getDuties(ctx context.Context, in *ethpb.Dutie // Sync committees are an Altair feature fetchSyncDuties := in.Epoch >= params.BeaconConfig().AltairForkEpoch - var wg errgroup.Group + errCh := make(chan error, 1) var currentEpochDuties []*ethpb.DutiesResponse_Duty - wg.Go(func() error { + go func() { currentEpochDuties, err = c.getDutiesForEpoch(ctx, in.Epoch, vals, fetchSyncDuties) if err != nil { - return errors.Wrapf(err, "failed to get duties for current epoch `%d`", in.Epoch) + errCh <- errors.Wrapf(err, "failed to get duties for current epoch `%d`", in.Epoch) + return } - return nil - }) + errCh <- nil + }() nextEpochDuties, err := c.getDutiesForEpoch(ctx, in.Epoch+1, vals, fetchSyncDuties) if err != nil { return nil, errors.Wrapf(err, "failed to get duties for next epoch `%d`", in.Epoch+1) } - if err = wg.Wait(); err != nil { + if err = <-errCh; err != nil { return nil, err } @@ -86,10 +87,21 @@ func (c beaconApiValidatorClient) getDutiesForEpoch( indices[i] = v.index } - var wg errgroup.Group + // Below variables MUST NOT be used in the main function before wg.Wait(). + // This is because they are populated in goroutines and wg.Wait() + // will return only once all goroutines finish their execution. // Mapping from a validator index to its attesting committee's index and slot attesterDutiesMapping := make(map[primitives.ValidatorIndex]committeeIndexSlotPair) + // Set containing all validator indices that are part of a sync committee for this epoch + syncDutiesMapping := make(map[primitives.ValidatorIndex]bool) + // Mapping from a validator index to its proposal slot + proposerDutySlots := make(map[primitives.ValidatorIndex][]primitives.Slot) + // Mapping from the {committeeIndex, slot} to each of the committee's validator indices + committeeMapping := make(map[committeeIndexSlotPair][]primitives.ValidatorIndex) + + var wg errgroup.Group + wg.Go(func() error { attesterDuties, err := c.dutiesProvider.GetAttesterDuties(ctx, epoch, indices) if err != nil { @@ -117,8 +129,6 @@ func (c beaconApiValidatorClient) getDutiesForEpoch( return nil }) - // Set containing all validator indices that are part of a sync committee for this epoch - syncDutiesMapping := make(map[primitives.ValidatorIndex]bool) if fetchSyncDuties { wg.Go(func() error { syncDuties, err := c.dutiesProvider.GetSyncDuties(ctx, epoch, indices) @@ -137,8 +147,6 @@ func (c beaconApiValidatorClient) getDutiesForEpoch( }) } - // Mapping from a validator index to its proposal slot - proposerDutySlots := make(map[primitives.ValidatorIndex][]primitives.Slot) wg.Go(func() error { proposerDuties, err := c.dutiesProvider.GetProposerDuties(ctx, epoch) if err != nil { @@ -160,8 +168,6 @@ func (c beaconApiValidatorClient) getDutiesForEpoch( return nil }) - // Mapping from the {committeeIndex, slot} to each of the committee's validator indices - committeeMapping := make(map[committeeIndexSlotPair][]primitives.ValidatorIndex) committees, err := c.dutiesProvider.GetCommittees(ctx, epoch) if err != nil { return nil, errors.Wrapf(err, "failed to get committees for epoch `%d`", epoch) @@ -205,9 +211,11 @@ func (c beaconApiValidatorClient) getDutiesForEpoch( duties := make([]*ethpb.DutiesResponse_Duty, len(vals)) for i, v := range vals { - var attesterSlot primitives.Slot - var committeeIndex primitives.CommitteeIndex - var committeeValidatorIndices []primitives.ValidatorIndex + var ( + attesterSlot primitives.Slot + committeeIndex primitives.CommitteeIndex + committeeValidatorIndices []primitives.ValidatorIndex + ) if committeeMappingKey, ok := attesterDutiesMapping[v.index]; ok { committeeIndex = committeeMappingKey.committeeIndex