Skip to content

Commit

Permalink
IndexedAtt wrapper for the slasher feed (#14150)
Browse files Browse the repository at this point in the history
* `IndexedAtt` wrapper for the slasher feed

* test fixes

* fix simulator

* fix e2e

* Revert "Auxiliary commit to revert individual files from 191bbf7"

This reverts commit 2b0441a23a0e5f66e50cf36c3bbfbb39d587b17b.

* extract interface from channel
  • Loading branch information
rkapka authored Jun 28, 2024
1 parent fa37072 commit 78cf75a
Show file tree
Hide file tree
Showing 12 changed files with 39 additions and 19 deletions.
1 change: 1 addition & 0 deletions beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ go_library(
"//beacon-chain/operations/slashings:go_default_library",
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/slasher/types:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/slasher/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
Expand Down Expand Up @@ -497,7 +498,7 @@ func (s *Service) sendBlockAttestationsToSlasher(signed interfaces.ReadOnlySigne
log.WithError(err).Error("Could not convert to indexed attestation")
return
}
s.cfg.SlasherAttestationsFeed.Send(indexedAtt)
s.cfg.SlasherAttestationsFeed.Send(&types.WrappedIndexedAtt{IndexedAtt: indexedAtt})
}
}

Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/slasher/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
// Receive indexed attestations from some source event feed,
// validating their integrity before appending them to an attestation queue
// for batch processing in a separate routine.
func (s *Service) receiveAttestations(ctx context.Context, indexedAttsChan chan ethpb.IndexedAtt) {
func (s *Service) receiveAttestations(ctx context.Context, indexedAttsChan chan *slashertypes.WrappedIndexedAtt) {
defer s.wg.Done()

sub := s.serviceCfg.IndexedAttestationsFeed.Subscribe(indexedAttsChan)
Expand All @@ -39,7 +39,7 @@ func (s *Service) receiveAttestations(ctx context.Context, indexedAttsChan chan
continue
}
attWrapper := &slashertypes.IndexedAttestationWrapper{
IndexedAttestation: att,
IndexedAttestation: att.IndexedAtt,
DataRoot: dataRoot,
}
s.attsQueue.push(attWrapper)
Expand Down
25 changes: 15 additions & 10 deletions beacon-chain/slasher/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestSlasher_receiveAttestations_OK(t *testing.T) {
},
attsQueue: newAttestationsQueue(),
}
indexedAttsChan := make(chan ethpb.IndexedAtt)
indexedAttsChan := make(chan *slashertypes.WrappedIndexedAtt)
defer close(indexedAttsChan)

s.wg.Add(1)
Expand All @@ -40,13 +40,15 @@ func TestSlasher_receiveAttestations_OK(t *testing.T) {
secondIndices := []uint64{4, 5, 6}
att1 := createAttestationWrapperEmptySig(t, 1, 2, firstIndices, nil)
att2 := createAttestationWrapperEmptySig(t, 1, 2, secondIndices, nil)
indexedAttsChan <- att1.IndexedAttestation
indexedAttsChan <- att2.IndexedAttestation
wrappedAtt1 := &slashertypes.WrappedIndexedAtt{IndexedAtt: att1.IndexedAttestation}
wrappedAtt2 := &slashertypes.WrappedIndexedAtt{IndexedAtt: att2.IndexedAttestation}
indexedAttsChan <- wrappedAtt1
indexedAttsChan <- wrappedAtt2
cancel()
s.wg.Wait()
wanted := []*slashertypes.IndexedAttestationWrapper{
att1,
att2,
{IndexedAttestation: att1.IndexedAttestation, DataRoot: att1.DataRoot},
{IndexedAttestation: att2.IndexedAttestation, DataRoot: att2.DataRoot},
}
require.DeepEqual(t, wanted, s.attsQueue.dequeue())
}
Expand Down Expand Up @@ -212,7 +214,7 @@ func TestSlasher_receiveAttestations_OnlyValidAttestations(t *testing.T) {
},
attsQueue: newAttestationsQueue(),
}
indexedAttsChan := make(chan ethpb.IndexedAtt)
indexedAttsChan := make(chan *slashertypes.WrappedIndexedAtt)
defer close(indexedAttsChan)

s.wg.Add(1)
Expand All @@ -223,18 +225,21 @@ func TestSlasher_receiveAttestations_OnlyValidAttestations(t *testing.T) {
secondIndices := []uint64{4, 5, 6}
// Add a valid attestation.
validAtt := createAttestationWrapperEmptySig(t, 1, 2, firstIndices, nil)
indexedAttsChan <- validAtt.IndexedAttestation
wrappedValidAtt := &slashertypes.WrappedIndexedAtt{IndexedAtt: validAtt.IndexedAttestation}
indexedAttsChan <- wrappedValidAtt
// Send an invalid, bad attestation which will not
// pass integrity checks at it has invalid attestation data.
indexedAttsChan <- &ethpb.IndexedAttestation{
AttestingIndices: secondIndices,
indexedAttsChan <- &slashertypes.WrappedIndexedAtt{
IndexedAtt: &ethpb.IndexedAttestation{
AttestingIndices: secondIndices,
},
}
cancel()
s.wg.Wait()
// Expect only a single, valid attestation was added to the queue.
require.Equal(t, 1, s.attsQueue.size())
wanted := []*slashertypes.IndexedAttestationWrapper{
validAtt,
{IndexedAttestation: validAtt.IndexedAttestation, DataRoot: validAtt.DataRoot},
}
require.DeepEqual(t, wanted, s.attsQueue.dequeue())
}
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/slasher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/slasher/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
beaconChainSync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (s *Service) run() {
"Finished retrieving last epoch written per validator",
)

indexedAttsChan := make(chan ethpb.IndexedAtt, 1)
indexedAttsChan := make(chan *types.WrappedIndexedAtt, 1)
beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader, 1)

s.wg.Add(1)
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/slasher/types/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = [
"//beacon-chain:__subpackages__",
"//cmd/prysmctl:__subpackages__",
"//testing/slasher/simulator:__subpackages__",
],
deps = [
"//consensus-types/primitives:go_default_library",
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/slasher/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ func (c ChunkKind) String() string {
}
}

// WrappedIndexedAtt is a wrapper over the IndexedAtt interface.
// The wrapper is needed to overcome the limitation of the event feed library
// which doesn't work well with interface types.
type WrappedIndexedAtt struct {
ethpb.IndexedAtt
}

// IndexedAttestationWrapper contains an indexed attestation with its
// data root to reduce duplicated computation.
type IndexedAttestationWrapper struct {
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ go_library(
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/types:go_default_library",
"//beacon-chain/slasher/types:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/sync/validate_beacon_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/slasher/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
Expand Down Expand Up @@ -137,7 +138,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
tracing.AnnotateError(span, err)
return
}
s.cfg.slasherAttestationsFeed.Send(indexedAtt)
s.cfg.slasherAttestationsFeed.Send(&types.WrappedIndexedAtt{IndexedAtt: indexedAtt})
}()
}

Expand Down
4 changes: 2 additions & 2 deletions testing/endtoend/evaluators/slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var ValidatorsSlashedAfterEpoch = func(n primitives.Epoch) e2eTypes.Evaluator {
// SlashedValidatorsLoseBalanceAfterEpoch checks if the validators slashed lose the right balance.
var SlashedValidatorsLoseBalanceAfterEpoch = func(n primitives.Epoch) e2eTypes.Evaluator {
return e2eTypes.Evaluator{
Name: "slashed_validators_lose_valance_epoch_%d",
Name: "slashed_validators_lose_balance_epoch_%d",
Policy: policies.AfterNthEpoch(n),
Evaluation: validatorsLoseBalance,
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func validatorsLoseBalance(_ *e2eTypes.EvaluationContext, conns ...*grpc.ClientC
slashedBal := params.BeaconConfig().MaxEffectiveBalance - slashedPenalty + params.BeaconConfig().EffectiveBalanceIncrement/10
if valResp.EffectiveBalance >= slashedBal {
return fmt.Errorf(
"expected slashed validator %d to balance less than %d, received %d",
"expected slashed validator %d balance to be less than %d, received %d",
i,
slashedBal,
valResp.EffectiveBalance,
Expand Down
1 change: 1 addition & 0 deletions testing/slasher/simulator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//beacon-chain/db:go_default_library",
"//beacon-chain/operations/slashings:go_default_library",
"//beacon-chain/slasher:go_default_library",
"//beacon-chain/slasher/types:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
Expand Down
5 changes: 3 additions & 2 deletions testing/slasher/simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/slasher"
slashertypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/slasher/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
Expand Down Expand Up @@ -211,7 +212,7 @@ func (s *Simulator) simulateBlocksAndAttestations(ctx context.Context) {
}
log.WithFields(logrus.Fields{
"numAtts": len(atts),
"numSlashable": len(propSlashings),
"numSlashable": len(attSlashings),
}).Infof("Producing attestations for slot %d", slot)
for _, sl := range attSlashings {
slashingRoot, err := sl.HashTreeRoot()
Expand All @@ -221,7 +222,7 @@ func (s *Simulator) simulateBlocksAndAttestations(ctx context.Context) {
s.sentAttesterSlashings[slashingRoot] = sl
}
for _, aa := range atts {
s.indexedAttsFeed.Send(aa)
s.indexedAttsFeed.Send(&slashertypes.WrappedIndexedAtt{IndexedAtt: aa})
}
case <-ctx.Done():
return
Expand Down

0 comments on commit 78cf75a

Please sign in to comment.