From 74ccb4a9dfda26b674b47555fbe8d1bcefa744b9 Mon Sep 17 00:00:00 2001 From: rkapka Date: Wed, 5 Jun 2024 20:24:18 +0200 Subject: [PATCH 01/10] EIP-7549: p2p and sync --- beacon-chain/p2p/gossip_topic_mappings.go | 25 ++- .../p2p/gossip_topic_mappings_test.go | 80 +++++-- beacon-chain/p2p/types/object_mapping.go | 78 +++++-- beacon-chain/p2p/types/object_mapping_test.go | 6 + beacon-chain/sync/BUILD.bazel | 3 +- beacon-chain/sync/decode_pubsub.go | 53 ++++- beacon-chain/sync/decode_pubsub_test.go | 199 ++++++++++++++++++ .../sync/pending_attestations_queue.go | 62 +++--- .../sync/pending_attestations_queue_test.go | 92 ++++++-- beacon-chain/sync/rpc_chunked_response.go | 30 +-- .../sync/rpc_chunked_response_test.go | 121 ----------- beacon-chain/sync/rpc_metadata.go | 30 +-- beacon-chain/sync/rpc_metadata_test.go | 81 ------- beacon-chain/sync/service.go | 4 +- .../sync/subscriber_beacon_aggregate_proof.go | 14 +- .../sync/subscriber_beacon_attestation.go | 12 +- beacon-chain/sync/validate_aggregate_proof.go | 132 ++++++++---- .../sync/validate_aggregate_proof_test.go | 6 +- .../sync/validate_beacon_attestation.go | 85 ++++++-- .../validate_beacon_attestation_electra.go | 27 +++ ...alidate_beacon_attestation_electra_test.go | 46 ++++ .../sync/validate_beacon_attestation_test.go | 4 +- 22 files changed, 782 insertions(+), 408 deletions(-) delete mode 100644 beacon-chain/sync/rpc_chunked_response_test.go create mode 100644 beacon-chain/sync/validate_beacon_attestation_electra.go create mode 100644 beacon-chain/sync/validate_beacon_attestation_electra_test.go diff --git a/beacon-chain/p2p/gossip_topic_mappings.go b/beacon-chain/p2p/gossip_topic_mappings.go index 76dfe722ed65..31733df0aa83 100644 --- a/beacon-chain/p2p/gossip_topic_mappings.go +++ b/beacon-chain/p2p/gossip_topic_mappings.go @@ -27,7 +27,8 @@ var gossipTopicMappings = map[string]proto.Message{ // GossipTopicMappings is a function to return the assigned data type // versioned by epoch. func GossipTopicMappings(topic string, epoch primitives.Epoch) proto.Message { - if topic == BlockSubnetTopicFormat { + switch topic { + case BlockSubnetTopicFormat: if epoch >= params.BeaconConfig().ElectraForkEpoch { return ðpb.SignedBeaconBlockElectra{} } @@ -43,8 +44,25 @@ func GossipTopicMappings(topic string, epoch primitives.Epoch) proto.Message { if epoch >= params.BeaconConfig().AltairForkEpoch { return ðpb.SignedBeaconBlockAltair{} } + return gossipTopicMappings[topic] + case AttestationSubnetTopicFormat: + if epoch >= params.BeaconConfig().ElectraForkEpoch { + return ðpb.AttestationElectra{} + } + return gossipTopicMappings[topic] + case AttesterSlashingSubnetTopicFormat: + if epoch >= params.BeaconConfig().ElectraForkEpoch { + return ðpb.AttesterSlashingElectra{} + } + return gossipTopicMappings[topic] + case AggregateAndProofSubnetTopicFormat: + if epoch >= params.BeaconConfig().ElectraForkEpoch { + return ðpb.SignedAggregateAttestationAndProofElectra{} + } + return gossipTopicMappings[topic] + default: + return gossipTopicMappings[topic] } - return gossipTopicMappings[topic] } // AllTopics returns all topics stored in our @@ -75,4 +93,7 @@ func init() { GossipTypeMapping[reflect.TypeOf(ðpb.SignedBeaconBlockDeneb{})] = BlockSubnetTopicFormat // Specially handle Electra objects. GossipTypeMapping[reflect.TypeOf(ðpb.SignedBeaconBlockElectra{})] = BlockSubnetTopicFormat + GossipTypeMapping[reflect.TypeOf(ðpb.AttestationElectra{})] = AttestationSubnetTopicFormat + GossipTypeMapping[reflect.TypeOf(ðpb.AttesterSlashingElectra{})] = AttesterSlashingSubnetTopicFormat + GossipTypeMapping[reflect.TypeOf(ðpb.SignedAggregateAttestationAndProofElectra{})] = AggregateAndProofSubnetTopicFormat } diff --git a/beacon-chain/p2p/gossip_topic_mappings_test.go b/beacon-chain/p2p/gossip_topic_mappings_test.go index 88b03ba642fa..efe7f00e6e4f 100644 --- a/beacon-chain/p2p/gossip_topic_mappings_test.go +++ b/beacon-chain/p2p/gossip_topic_mappings_test.go @@ -22,20 +22,20 @@ func TestMappingHasNoDuplicates(t *testing.T) { } } -func TestGossipTopicMappings_CorrectBlockType(t *testing.T) { +func TestGossipTopicMappings_CorrectType(t *testing.T) { params.SetupTestConfigCleanup(t) bCfg := params.BeaconConfig().Copy() altairForkEpoch := primitives.Epoch(100) - BellatrixForkEpoch := primitives.Epoch(200) - CapellaForkEpoch := primitives.Epoch(300) - DenebForkEpoch := primitives.Epoch(400) - ElectraForkEpoch := primitives.Epoch(500) + bellatrixForkEpoch := primitives.Epoch(200) + capellaForkEpoch := primitives.Epoch(300) + denebForkEpoch := primitives.Epoch(400) + electraForkEpoch := primitives.Epoch(500) bCfg.AltairForkEpoch = altairForkEpoch - bCfg.BellatrixForkEpoch = BellatrixForkEpoch - bCfg.CapellaForkEpoch = CapellaForkEpoch - bCfg.DenebForkEpoch = DenebForkEpoch - bCfg.ElectraForkEpoch = ElectraForkEpoch + bCfg.BellatrixForkEpoch = bellatrixForkEpoch + bCfg.CapellaForkEpoch = capellaForkEpoch + bCfg.DenebForkEpoch = denebForkEpoch + bCfg.ElectraForkEpoch = electraForkEpoch bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.AltairForkVersion)] = primitives.Epoch(100) bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.BellatrixForkVersion)] = primitives.Epoch(200) bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.CapellaForkVersion)] = primitives.Epoch(300) @@ -47,29 +47,83 @@ func TestGossipTopicMappings_CorrectBlockType(t *testing.T) { pMessage := GossipTopicMappings(BlockSubnetTopicFormat, 0) _, ok := pMessage.(*ethpb.SignedBeaconBlock) assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, 0) + _, ok = pMessage.(*ethpb.Attestation) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, 0) + _, ok = pMessage.(*ethpb.AttesterSlashing) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, 0) + _, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProof) + assert.Equal(t, true, ok) // Altair Fork pMessage = GossipTopicMappings(BlockSubnetTopicFormat, altairForkEpoch) _, ok = pMessage.(*ethpb.SignedBeaconBlockAltair) assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, altairForkEpoch) + _, ok = pMessage.(*ethpb.Attestation) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, altairForkEpoch) + _, ok = pMessage.(*ethpb.AttesterSlashing) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, altairForkEpoch) + _, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProof) + assert.Equal(t, true, ok) // Bellatrix Fork - pMessage = GossipTopicMappings(BlockSubnetTopicFormat, BellatrixForkEpoch) + pMessage = GossipTopicMappings(BlockSubnetTopicFormat, bellatrixForkEpoch) _, ok = pMessage.(*ethpb.SignedBeaconBlockBellatrix) assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, bellatrixForkEpoch) + _, ok = pMessage.(*ethpb.Attestation) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, bellatrixForkEpoch) + _, ok = pMessage.(*ethpb.AttesterSlashing) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, bellatrixForkEpoch) + _, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProof) + assert.Equal(t, true, ok) // Capella Fork - pMessage = GossipTopicMappings(BlockSubnetTopicFormat, CapellaForkEpoch) + pMessage = GossipTopicMappings(BlockSubnetTopicFormat, capellaForkEpoch) _, ok = pMessage.(*ethpb.SignedBeaconBlockCapella) assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, capellaForkEpoch) + _, ok = pMessage.(*ethpb.Attestation) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, capellaForkEpoch) + _, ok = pMessage.(*ethpb.AttesterSlashing) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, capellaForkEpoch) + _, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProof) + assert.Equal(t, true, ok) // Deneb Fork - pMessage = GossipTopicMappings(BlockSubnetTopicFormat, DenebForkEpoch) + pMessage = GossipTopicMappings(BlockSubnetTopicFormat, denebForkEpoch) _, ok = pMessage.(*ethpb.SignedBeaconBlockDeneb) assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, denebForkEpoch) + _, ok = pMessage.(*ethpb.Attestation) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, denebForkEpoch) + _, ok = pMessage.(*ethpb.AttesterSlashing) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, denebForkEpoch) + _, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProof) + assert.Equal(t, true, ok) // Electra Fork - pMessage = GossipTopicMappings(BlockSubnetTopicFormat, ElectraForkEpoch) + pMessage = GossipTopicMappings(BlockSubnetTopicFormat, electraForkEpoch) _, ok = pMessage.(*ethpb.SignedBeaconBlockElectra) assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, electraForkEpoch) + _, ok = pMessage.(*ethpb.AttestationElectra) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, electraForkEpoch) + _, ok = pMessage.(*ethpb.AttesterSlashingElectra) + assert.Equal(t, true, ok) + pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, electraForkEpoch) + _, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProofElectra) + assert.Equal(t, true, ok) } diff --git a/beacon-chain/p2p/types/object_mapping.go b/beacon-chain/p2p/types/object_mapping.go index 2698bf93f54a..e8646b34ee7a 100644 --- a/beacon-chain/p2p/types/object_mapping.go +++ b/beacon-chain/p2p/types/object_mapping.go @@ -26,7 +26,13 @@ var ( BlockMap map[[4]byte]func() (interfaces.ReadOnlySignedBeaconBlock, error) // MetaDataMap maps the fork-version to the underlying data type for that // particular fork period. - MetaDataMap map[[4]byte]func() metadata.Metadata + MetaDataMap map[[4]byte]func() (metadata.Metadata, error) + // AttestationMap maps the fork-version to the underlying data type for that + // particular fork period. + AttestationMap map[[4]byte]func() (ethpb.Att, error) + // AggregateAttestationMap maps the fork-version to the underlying data type for that + // particular fork period. + AggregateAttestationMap map[[4]byte]func() (ethpb.SignedAggregateAttAndProof, error) ) // InitializeDataMaps initializes all the relevant object maps. This function is called to @@ -67,24 +73,68 @@ func InitializeDataMaps() { } // Reset our metadata map. - MetaDataMap = map[[4]byte]func() metadata.Metadata{ - bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion): func() metadata.Metadata { - return wrapper.WrappedMetadataV0(ðpb.MetaDataV0{}) + MetaDataMap = map[[4]byte]func() (metadata.Metadata, error){ + bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion): func() (metadata.Metadata, error) { + return wrapper.WrappedMetadataV0(ðpb.MetaDataV0{}), nil + }, + bytesutil.ToBytes4(params.BeaconConfig().AltairForkVersion): func() (metadata.Metadata, error) { + return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), nil + }, + bytesutil.ToBytes4(params.BeaconConfig().BellatrixForkVersion): func() (metadata.Metadata, error) { + return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), nil + }, + bytesutil.ToBytes4(params.BeaconConfig().CapellaForkVersion): func() (metadata.Metadata, error) { + return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), nil + }, + bytesutil.ToBytes4(params.BeaconConfig().DenebForkVersion): func() (metadata.Metadata, error) { + return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), nil + }, + bytesutil.ToBytes4(params.BeaconConfig().ElectraForkVersion): func() (metadata.Metadata, error) { + return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), nil + }, + } + + // Reset our attestation map. + AttestationMap = map[[4]byte]func() (ethpb.Att, error){ + bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion): func() (ethpb.Att, error) { + return ðpb.Attestation{}, nil + }, + bytesutil.ToBytes4(params.BeaconConfig().AltairForkVersion): func() (ethpb.Att, error) { + return ðpb.Attestation{}, nil + }, + bytesutil.ToBytes4(params.BeaconConfig().BellatrixForkVersion): func() (ethpb.Att, error) { + return ðpb.Attestation{}, nil + }, + bytesutil.ToBytes4(params.BeaconConfig().CapellaForkVersion): func() (ethpb.Att, error) { + return ðpb.Attestation{}, nil + }, + bytesutil.ToBytes4(params.BeaconConfig().DenebForkVersion): func() (ethpb.Att, error) { + return ðpb.Attestation{}, nil + }, + bytesutil.ToBytes4(params.BeaconConfig().ElectraForkVersion): func() (ethpb.Att, error) { + return ðpb.AttestationElectra{}, nil + }, + } + + // Reset our aggregate attestation map. + AggregateAttestationMap = map[[4]byte]func() (ethpb.SignedAggregateAttAndProof, error){ + bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion): func() (ethpb.SignedAggregateAttAndProof, error) { + return ðpb.SignedAggregateAttestationAndProof{}, nil }, - bytesutil.ToBytes4(params.BeaconConfig().AltairForkVersion): func() metadata.Metadata { - return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}) + bytesutil.ToBytes4(params.BeaconConfig().AltairForkVersion): func() (ethpb.SignedAggregateAttAndProof, error) { + return ðpb.SignedAggregateAttestationAndProof{}, nil }, - bytesutil.ToBytes4(params.BeaconConfig().BellatrixForkVersion): func() metadata.Metadata { - return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}) + bytesutil.ToBytes4(params.BeaconConfig().BellatrixForkVersion): func() (ethpb.SignedAggregateAttAndProof, error) { + return ðpb.SignedAggregateAttestationAndProof{}, nil }, - bytesutil.ToBytes4(params.BeaconConfig().CapellaForkVersion): func() metadata.Metadata { - return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}) + bytesutil.ToBytes4(params.BeaconConfig().CapellaForkVersion): func() (ethpb.SignedAggregateAttAndProof, error) { + return ðpb.SignedAggregateAttestationAndProof{}, nil }, - bytesutil.ToBytes4(params.BeaconConfig().DenebForkVersion): func() metadata.Metadata { - return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}) + bytesutil.ToBytes4(params.BeaconConfig().DenebForkVersion): func() (ethpb.SignedAggregateAttAndProof, error) { + return ðpb.SignedAggregateAttestationAndProof{}, nil }, - bytesutil.ToBytes4(params.BeaconConfig().ElectraForkVersion): func() metadata.Metadata { - return wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}) + bytesutil.ToBytes4(params.BeaconConfig().ElectraForkVersion): func() (ethpb.SignedAggregateAttAndProof, error) { + return ðpb.SignedAggregateAttestationAndProofElectra{}, nil }, } } diff --git a/beacon-chain/p2p/types/object_mapping_test.go b/beacon-chain/p2p/types/object_mapping_test.go index 67c1fccaeec2..119c320d280f 100644 --- a/beacon-chain/p2p/types/object_mapping_test.go +++ b/beacon-chain/p2p/types/object_mapping_test.go @@ -46,6 +46,12 @@ func TestInitializeDataMaps(t *testing.T) { tt.action() _, ok := BlockMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] assert.Equal(t, tt.exists, ok) + _, ok = MetaDataMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] + assert.Equal(t, tt.exists, ok) + _, ok = AttestationMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] + assert.Equal(t, tt.exists, ok) + _, ok = AggregateAttestationMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] + assert.Equal(t, tt.exists, ok) }) } } diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 3a1a4673fbb9..c33ebdeb2e41 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -44,6 +44,7 @@ go_library( "validate_aggregate_proof.go", "validate_attester_slashing.go", "validate_beacon_attestation.go", + "validate_beacon_attestation_electra.go", "validate_beacon_blocks.go", "validate_blob.go", "validate_bls_to_execution_change.go", @@ -160,7 +161,6 @@ go_test( "rpc_beacon_blocks_by_root_test.go", "rpc_blob_sidecars_by_range_test.go", "rpc_blob_sidecars_by_root_test.go", - "rpc_chunked_response_test.go", "rpc_goodbye_test.go", "rpc_handler_test.go", "rpc_metadata_test.go", @@ -177,6 +177,7 @@ go_test( "sync_test.go", "validate_aggregate_proof_test.go", "validate_attester_slashing_test.go", + "validate_beacon_attestation_electra_test.go", "validate_beacon_attestation_test.go", "validate_beacon_blocks_test.go", "validate_blob_test.go", diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index d5524f70a198..57661f527106 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -1,13 +1,19 @@ package sync import ( + "fmt" "reflect" "strings" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/pkg/errors" ssz "github.com/prysmaticlabs/fastssz" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "google.golang.org/protobuf/proto" ) @@ -51,7 +57,19 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err } // Handle different message types across forks. if topic == p2p.BlockSubnetTopicFormat { - m, err = extractBlockDataType(fDigest[:], s.cfg.clock) + m, err = extractDataType(types.BlockMap, fDigest[:], s.cfg.clock) + if err != nil { + return nil, err + } + } + if topic == p2p.AttestationSubnetTopicFormat { + m, err = extractDataType(types.AttestationMap, fDigest[:], s.cfg.clock) + if err != nil { + return nil, err + } + } + if topic == p2p.AggregateAndProofSubnetTopicFormat { + m, err = extractDataType(types.AggregateAttestationMap, fDigest[:], s.cfg.clock) if err != nil { return nil, err } @@ -71,3 +89,36 @@ func (_ *Service) replaceForkDigest(topic string) (string, error) { subStrings[2] = "%x" return strings.Join(subStrings, "/"), nil } + +func extractDataType[T any](typeMap map[[4]byte]func() (T, error), digest []byte, tor blockchain.TemporalOracle) (T, error) { + var zero T + + if len(digest) == 0 { + f, ok := typeMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] + if !ok { + return zero, fmt.Errorf("no %T type exists for the genesis fork version", zero) + } + return f() + } + if len(digest) != forkDigestLength { + return zero, errors.Errorf("invalid digest returned, wanted a length of %d but received %d", forkDigestLength, len(digest)) + } + vRoot := tor.GenesisValidatorsRoot() + for k, f := range typeMap { + rDigest, err := signing.ComputeForkDigest(k[:], vRoot[:]) + if err != nil { + return zero, err + } + if rDigest == bytesutil.ToBytes4(digest) { + return f() + } + } + return zero, errors.Wrapf( + ErrNoValidDigest, + "could not extract %T data type, saw digest=%#x, genesis=%v, vr=%#x", + zero, + digest, + tor.GenesisTime(), + tor.GenesisValidatorsRoot(), + ) +} diff --git a/beacon-chain/sync/decode_pubsub_test.go b/beacon-chain/sync/decode_pubsub_test.go index 1bc0f1c06f17..cb22ece372b7 100644 --- a/beacon-chain/sync/decode_pubsub_test.go +++ b/beacon-chain/sync/decode_pubsub_test.go @@ -11,15 +11,20 @@ import ( "github.com/d4l3k/messagediff" pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" p2ptesting "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" + "github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper" + enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata" "github.com/prysmaticlabs/prysm/v5/testing/require" "github.com/prysmaticlabs/prysm/v5/testing/util" ) @@ -109,3 +114,197 @@ func TestService_decodePubsubMessage(t *testing.T) { }) } } + +func TestExtractDataType(t *testing.T) { + // Precompute digests + genDigest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, params.BeaconConfig().ZeroHash[:]) + require.NoError(t, err) + altairDigest, err := signing.ComputeForkDigest(params.BeaconConfig().AltairForkVersion, params.BeaconConfig().ZeroHash[:]) + require.NoError(t, err) + bellatrixDigest, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, params.BeaconConfig().ZeroHash[:]) + require.NoError(t, err) + capellaDigest, err := signing.ComputeForkDigest(params.BeaconConfig().CapellaForkVersion, params.BeaconConfig().ZeroHash[:]) + require.NoError(t, err) + denebDigest, err := signing.ComputeForkDigest(params.BeaconConfig().DenebForkVersion, params.BeaconConfig().ZeroHash[:]) + require.NoError(t, err) + electraDigest, err := signing.ComputeForkDigest(params.BeaconConfig().ElectraForkVersion, params.BeaconConfig().ZeroHash[:]) + require.NoError(t, err) + + type args struct { + digest []byte + chain blockchain.ChainInfoFetcher + } + tests := []struct { + name string + args args + wantBlock interfaces.ReadOnlySignedBeaconBlock + wantMd metadata.Metadata + wantAtt ethpb.Att + wantAggregate ethpb.SignedAggregateAttAndProof + wantErr bool + }{ + { + name: "no digest", + args: args{ + digest: []byte{}, + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + wantBlock: func() interfaces.ReadOnlySignedBeaconBlock { + wsb, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Body: ðpb.BeaconBlockBody{}}}) + require.NoError(t, err) + return wsb + }(), + wantMd: wrapper.WrappedMetadataV0(ðpb.MetaDataV0{}), + wantAtt: ðpb.Attestation{}, + wantAggregate: ðpb.SignedAggregateAttestationAndProof{}, + wantErr: false, + }, + { + name: "invalid digest", + args: args{ + digest: []byte{0x00, 0x01}, + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + wantBlock: nil, + wantMd: nil, + wantAtt: nil, + wantAggregate: nil, + wantErr: true, + }, + { + name: "non existent digest", + args: args{ + digest: []byte{0x00, 0x01, 0x02, 0x03}, + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + wantBlock: nil, + wantMd: nil, + wantAtt: nil, + wantAggregate: nil, + wantErr: true, + }, + { + name: "genesis fork version", + args: args{ + digest: genDigest[:], + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + wantBlock: func() interfaces.ReadOnlySignedBeaconBlock { + wsb, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Body: ðpb.BeaconBlockBody{}}}) + require.NoError(t, err) + return wsb + }(), + wantAtt: ðpb.Attestation{}, + wantAggregate: ðpb.SignedAggregateAttestationAndProof{}, + wantErr: false, + }, + { + name: "altair fork version", + args: args{ + digest: altairDigest[:], + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + wantBlock: func() interfaces.ReadOnlySignedBeaconBlock { + wsb, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlockAltair{Block: ðpb.BeaconBlockAltair{Body: ðpb.BeaconBlockBodyAltair{}}}) + require.NoError(t, err) + return wsb + }(), + wantMd: wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), + wantAtt: ðpb.Attestation{}, + wantAggregate: ðpb.SignedAggregateAttestationAndProof{}, + wantErr: false, + }, + { + name: "bellatrix fork version", + args: args{ + digest: bellatrixDigest[:], + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + wantBlock: func() interfaces.ReadOnlySignedBeaconBlock { + wsb, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlockBellatrix{Block: ðpb.BeaconBlockBellatrix{Body: ðpb.BeaconBlockBodyBellatrix{ExecutionPayload: &enginev1.ExecutionPayload{}}}}) + require.NoError(t, err) + return wsb + }(), + wantMd: wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), + wantAtt: ðpb.Attestation{}, + wantAggregate: ðpb.SignedAggregateAttestationAndProof{}, + wantErr: false, + }, + { + name: "capella fork version", + args: args{ + digest: capellaDigest[:], + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + wantBlock: func() interfaces.ReadOnlySignedBeaconBlock { + wsb, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlockCapella{Block: ðpb.BeaconBlockCapella{Body: ðpb.BeaconBlockBodyCapella{ExecutionPayload: &enginev1.ExecutionPayloadCapella{}}}}) + require.NoError(t, err) + return wsb + }(), + wantMd: wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), + wantAtt: ðpb.Attestation{}, + wantAggregate: ðpb.SignedAggregateAttestationAndProof{}, + wantErr: false, + }, + { + name: "deneb fork version", + args: args{ + digest: denebDigest[:], + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + wantBlock: func() interfaces.ReadOnlySignedBeaconBlock { + wsb, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlockDeneb{Block: ðpb.BeaconBlockDeneb{Body: ðpb.BeaconBlockBodyDeneb{ExecutionPayload: &enginev1.ExecutionPayloadDeneb{}}}}) + require.NoError(t, err) + return wsb + }(), + wantMd: wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), + wantAtt: ðpb.Attestation{}, + wantAggregate: ðpb.SignedAggregateAttestationAndProof{}, + wantErr: false, + }, + { + name: "electra fork version", + args: args{ + digest: electraDigest[:], + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + }, + wantBlock: func() interfaces.ReadOnlySignedBeaconBlock { + wsb, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlockElectra{Block: ðpb.BeaconBlockElectra{Body: ðpb.BeaconBlockBodyElectra{ExecutionPayload: &enginev1.ExecutionPayloadElectra{}}}}) + require.NoError(t, err) + return wsb + }(), + wantMd: wrapper.WrappedMetadataV1(ðpb.MetaDataV1{}), + wantAtt: ðpb.AttestationElectra{}, + wantAggregate: ðpb.SignedAggregateAttestationAndProofElectra{}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotBlock, err := extractDataType(types.BlockMap, tt.args.digest, tt.args.chain) + if (err != nil) != tt.wantErr { + t.Errorf("block: error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotBlock, tt.wantBlock) { + t.Errorf("block: got = %v, want %v", gotBlock, tt.wantBlock) + } + gotAtt, err := extractDataType(types.AttestationMap, tt.args.digest, tt.args.chain) + if (err != nil) != tt.wantErr { + t.Errorf("attestation: error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotAtt, tt.wantAtt) { + t.Errorf("attestation: got = %v, want %v", gotAtt, tt.wantAtt) + } + gotAggregate, err := extractDataType(types.AggregateAttestationMap, tt.args.digest, tt.args.chain) + if (err != nil) != tt.wantErr { + t.Errorf("aggregate: error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotAggregate, tt.wantAggregate) { + t.Errorf("aggregate: got = %v, want %v", gotAggregate, tt.wantAggregate) + } + }) + } +} diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index cb936a9330e6..f827d4fd6b11 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -15,6 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/crypto/rand" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/time/slots" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -87,12 +88,13 @@ func (s *Service) processPendingAtts(ctx context.Context) error { return s.sendBatchRootRequest(ctx, pendingRoots, randGen) } -func (s *Service) processAttestations(ctx context.Context, attestations []*ethpb.SignedAggregateAttestationAndProof) { +func (s *Service) processAttestations(ctx context.Context, attestations []ethpb.SignedAggregateAttAndProof) { for _, signedAtt := range attestations { - att := signedAtt.Message + aggregate := signedAtt.AggregateAttestationAndProof().AggregateVal() + data := aggregate.GetData() // The pending attestations can arrive in both aggregated and unaggregated forms, // each from has distinct validation steps. - if helpers.IsAggregated(att.Aggregate) { + if helpers.IsAggregated(aggregate) { // Save the pending aggregated attestation to the pool if it passes the aggregated // validation steps. valRes, err := s.validateAggregatedAtt(ctx, signedAtt) @@ -101,11 +103,11 @@ func (s *Service) processAttestations(ctx context.Context, attestations []*ethpb } aggValid := pubsub.ValidationAccept == valRes if s.validateBlockInAttestation(ctx, signedAtt) && aggValid { - if err := s.cfg.attPool.SaveAggregatedAttestation(att.Aggregate); err != nil { + if err := s.cfg.attPool.SaveAggregatedAttestation(aggregate); err != nil { log.WithError(err).Debug("Could not save aggregate attestation") continue } - s.setAggregatorIndexEpochSeen(att.Aggregate.Data.Target.Epoch, att.AggregatorIndex) + s.setAggregatorIndexEpochSeen(data.Target.Epoch, signedAtt.AggregateAttestationAndProof().GetAggregatorIndex()) // Broadcasting the signed attestation again once a node is able to process it. if err := s.cfg.p2p.Broadcast(ctx, signedAtt); err != nil { @@ -116,39 +118,39 @@ func (s *Service) processAttestations(ctx context.Context, attestations []*ethpb // This is an important validation before retrieving attestation pre state to defend against // attestation's target intentionally reference checkpoint that's long ago. // Verify current finalized checkpoint is an ancestor of the block defined by the attestation's beacon block root. - if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot)) { + if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) { log.WithError(blockchain.ErrNotDescendantOfFinalized).Debug("Could not verify finalized consistency") continue } - if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, att.Aggregate); err != nil { + if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, aggregate); err != nil { log.WithError(err).Debug("Could not verify FFG consistency") continue } - preState, err := s.cfg.chain.AttestationTargetState(ctx, att.Aggregate.Data.Target) + preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target) if err != nil { log.WithError(err).Debug("Could not retrieve attestation prestate") continue } - valid, err := s.validateUnaggregatedAttWithState(ctx, att.Aggregate, preState) + valid, err := s.validateUnaggregatedAttWithState(ctx, aggregate, preState) if err != nil { log.WithError(err).Debug("Pending unaggregated attestation failed validation") continue } if valid == pubsub.ValidationAccept { - if err := s.cfg.attPool.SaveUnaggregatedAttestation(att.Aggregate); err != nil { + if err := s.cfg.attPool.SaveUnaggregatedAttestation(aggregate); err != nil { log.WithError(err).Debug("Could not save unaggregated attestation") continue } - s.setSeenCommitteeIndicesSlot(att.Aggregate.Data.Slot, att.Aggregate.Data.CommitteeIndex, att.Aggregate.AggregationBits) + s.setSeenCommitteeIndicesSlot(data.Slot, data.CommitteeIndex, aggregate.GetAggregationBits()) - valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(att.Aggregate.Data.Slot)) + valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(data.Slot)) if err != nil { log.WithError(err).Debug("Could not retrieve active validator count") continue } // Broadcasting the signed attestation again once a node is able to process it. - if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, signedAtt.Message.Aggregate), signedAtt.Message.Aggregate); err != nil { + if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, aggregate), aggregate); err != nil { log.WithError(err).Debug("Could not broadcast") } } @@ -160,8 +162,8 @@ func (s *Service) processAttestations(ctx context.Context, attestations []*ethpb // root of the missing block. The value is the list of pending attestations // that voted for that block root. The caller of this function is responsible // for not sending repeated attestations to the pending queue. -func (s *Service) savePendingAtt(att *ethpb.SignedAggregateAttestationAndProof) { - root := bytesutil.ToBytes32(att.Message.Aggregate.Data.BeaconBlockRoot) +func (s *Service) savePendingAtt(att ethpb.SignedAggregateAttAndProof) { + root := bytesutil.ToBytes32(att.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot) s.pendingAttsLock.Lock() defer s.pendingAttsLock.Unlock() @@ -178,7 +180,7 @@ func (s *Service) savePendingAtt(att *ethpb.SignedAggregateAttestationAndProof) _, ok := s.blkRootToPendingAtts[root] if !ok { pendingAttCount.Inc() - s.blkRootToPendingAtts[root] = []*ethpb.SignedAggregateAttestationAndProof{att} + s.blkRootToPendingAtts[root] = []ethpb.SignedAggregateAttAndProof{att} return } // Skip if the attestation from the same aggregator already exists in @@ -192,20 +194,32 @@ func (s *Service) savePendingAtt(att *ethpb.SignedAggregateAttestationAndProof) s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], att) } -func attsAreEqual(a, b *ethpb.SignedAggregateAttestationAndProof) bool { - if a.Signature != nil { - return b.Signature != nil && a.Message.AggregatorIndex == b.Message.AggregatorIndex +func attsAreEqual(a, b ethpb.SignedAggregateAttAndProof) bool { + if a.GetSignature() != nil { + return b.GetSignature() != nil && a.AggregateAttestationAndProof().GetAggregatorIndex() == b.AggregateAttestationAndProof().GetAggregatorIndex() } - if b.Signature != nil { + if b.GetSignature() != nil { return false } - if a.Message.Aggregate.Data.Slot != b.Message.Aggregate.Data.Slot { + + aAggregate := a.AggregateAttestationAndProof().AggregateVal() + bAggregate := b.AggregateAttestationAndProof().AggregateVal() + aData := aAggregate.GetData() + bData := bAggregate.GetData() + + if aData.Slot != bData.Slot { return false } - if a.Message.Aggregate.Data.CommitteeIndex != b.Message.Aggregate.Data.CommitteeIndex { + + if a.Version() >= version.Electra { + if !bytes.Equal(aAggregate.CommitteeBitsVal().Bytes(), bAggregate.CommitteeBitsVal().Bytes()) { + return false + } + } else if aData.CommitteeIndex != bData.CommitteeIndex { return false } - return bytes.Equal(a.Message.Aggregate.AggregationBits, b.Message.Aggregate.AggregationBits) + + return bytes.Equal(aAggregate.GetAggregationBits(), bAggregate.GetAggregationBits()) } // This validates the pending attestations in the queue are still valid. @@ -221,7 +235,7 @@ func (s *Service) validatePendingAtts(ctx context.Context, slot primitives.Slot) for bRoot, atts := range s.blkRootToPendingAtts { for i := len(atts) - 1; i >= 0; i-- { - if slot >= atts[i].Message.Aggregate.Data.Slot+params.BeaconConfig().SlotsPerEpoch { + if slot >= atts[i].AggregateAttestationAndProof().AggregateVal().GetData().Slot+params.BeaconConfig().SlotsPerEpoch { // Remove the pending attestation from the list in place. atts = append(atts[:i], atts[i+1:]...) } diff --git a/beacon-chain/sync/pending_attestations_queue_test.go b/beacon-chain/sync/pending_attestations_queue_test.go index b99e15d5ffec..42252d03acd0 100644 --- a/beacon-chain/sync/pending_attestations_queue_test.go +++ b/beacon-chain/sync/pending_attestations_queue_test.go @@ -46,12 +46,12 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) { chain := &mock.ChainService{Genesis: prysmTime.Now(), FinalizedCheckPoint: ðpb.Checkpoint{}} r := &Service{ cfg: &config{p2p: p1, beaconDB: db, chain: chain, clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot)}, - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), chainStarted: abool.New(), } a := ðpb.AggregateAttestationAndProof{Aggregate: ðpb.Attestation{Data: ðpb.AttestationData{Target: ðpb.Checkpoint{Root: make([]byte, 32)}}}} - r.blkRootToPendingAtts[[32]byte{'A'}] = []*ethpb.SignedAggregateAttestationAndProof{{Message: a}} + r.blkRootToPendingAtts[[32]byte{'A'}] = []ethpb.SignedAggregateAttAndProof{ðpb.SignedAggregateAttestationAndProof{Message: a}} require.NoError(t, r.processPendingAtts(context.Background())) require.LogsContain(t, hook, "Requesting block by root") } @@ -124,7 +124,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) { clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), attPool: attestations.NewPool(), }, - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), seenUnAggregatedAttestationCache: lruwrpr.New(10), signatureChan: make(chan *signatureVerifier, verifierLimit), } @@ -134,7 +134,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) { require.NoError(t, err) require.NoError(t, r.cfg.beaconDB.SaveState(context.Background(), s, root)) - r.blkRootToPendingAtts[root] = []*ethpb.SignedAggregateAttestationAndProof{{Message: aggregateAndProof, Signature: aggreSig}} + r.blkRootToPendingAtts[root] = []ethpb.SignedAggregateAttAndProof{ðpb.SignedAggregateAttestationAndProof{Message: aggregateAndProof, Signature: aggreSig}} require.NoError(t, r.processPendingAtts(context.Background())) atts, err := r.cfg.attPool.UnaggregatedAttestations() @@ -162,7 +162,7 @@ func TestProcessPendingAtts_NoBroadcastWithBadSignature(t *testing.T) { clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), attPool: attestations.NewPool(), }, - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), } priv, err := bls.RandKey() @@ -182,7 +182,7 @@ func TestProcessPendingAtts_NoBroadcastWithBadSignature(t *testing.T) { util.SaveBlock(t, context.Background(), r.cfg.beaconDB, b) require.NoError(t, r.cfg.beaconDB.SaveState(context.Background(), s, r32)) - r.blkRootToPendingAtts[r32] = []*ethpb.SignedAggregateAttestationAndProof{{Message: a, Signature: make([]byte, fieldparams.BLSSignatureLength)}} + r.blkRootToPendingAtts[r32] = []ethpb.SignedAggregateAttAndProof{ðpb.SignedAggregateAttestationAndProof{Message: a, Signature: make([]byte, fieldparams.BLSSignatureLength)}} require.NoError(t, r.processPendingAtts(context.Background())) assert.Equal(t, false, p1.BroadcastCalled.Load(), "Broadcasted bad aggregate") @@ -245,13 +245,13 @@ func TestProcessPendingAtts_NoBroadcastWithBadSignature(t *testing.T) { clock: startup.NewClock(chain2.Genesis, chain2.ValidatorsRoot), attPool: attestations.NewPool(), }, - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), seenUnAggregatedAttestationCache: lruwrpr.New(10), signatureChan: make(chan *signatureVerifier, verifierLimit), } go r.verifierRoutine() - r.blkRootToPendingAtts[r32] = []*ethpb.SignedAggregateAttestationAndProof{{Message: aggregateAndProof, Signature: aggreSig}} + r.blkRootToPendingAtts[r32] = []ethpb.SignedAggregateAttAndProof{ðpb.SignedAggregateAttestationAndProof{Message: aggregateAndProof, Signature: aggreSig}} require.NoError(t, r.processPendingAtts(context.Background())) assert.Equal(t, true, p1.BroadcastCalled.Load(), "Could not broadcast the good aggregate") @@ -330,7 +330,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) { clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), attPool: attestations.NewPool(), }, - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), seenAggregatedAttestationCache: lruwrpr.New(10), signatureChan: make(chan *signatureVerifier, verifierLimit), } @@ -339,7 +339,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) { require.NoError(t, err) require.NoError(t, r.cfg.beaconDB.SaveState(context.Background(), s, root)) - r.blkRootToPendingAtts[root] = []*ethpb.SignedAggregateAttestationAndProof{{Message: aggregateAndProof, Signature: aggreSig}} + r.blkRootToPendingAtts[root] = []ethpb.SignedAggregateAttAndProof{ðpb.SignedAggregateAttestationAndProof{Message: aggregateAndProof, Signature: aggreSig}} require.NoError(t, r.processPendingAtts(context.Background())) assert.Equal(t, 1, len(r.cfg.attPool.AggregatedAttestations()), "Did not save aggregated att") @@ -353,7 +353,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) { func TestValidatePendingAtts_CanPruneOldAtts(t *testing.T) { s := &Service{ - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), } // 100 Attestations per block root. @@ -401,7 +401,7 @@ func TestValidatePendingAtts_CanPruneOldAtts(t *testing.T) { func TestValidatePendingAtts_NoDuplicatingAtts(t *testing.T) { s := &Service{ - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), } r1 := [32]byte{'A'} @@ -428,7 +428,7 @@ func TestValidatePendingAtts_NoDuplicatingAtts(t *testing.T) { func TestSavePendingAtts_BeyondLimit(t *testing.T) { s := &Service{ - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), } for i := 0; i < pendingAttsLimit; i++ { @@ -457,5 +457,71 @@ func TestSavePendingAtts_BeyondLimit(t *testing.T) { assert.Equal(t, 0, len(s.blkRootToPendingAtts[r1]), "Saved pending atts") assert.Equal(t, 0, len(s.blkRootToPendingAtts[r2]), "Saved pending atts") +} +func Test_attsAreEqual_committee(t *testing.T) { + t.Run("Phase 0 equal", func(t *testing.T) { + att1 := ðpb.SignedAggregateAttestationAndProof{ + Message: ðpb.AggregateAttestationAndProof{ + Aggregate: ðpb.Attestation{ + Data: ðpb.AttestationData{ + CommitteeIndex: 123}}}} + att2 := ðpb.SignedAggregateAttestationAndProof{ + Message: ðpb.AggregateAttestationAndProof{ + Aggregate: ðpb.Attestation{ + Data: ðpb.AttestationData{ + CommitteeIndex: 123}}}} + assert.Equal(t, true, attsAreEqual(att1, att2)) + }) + t.Run("Phase 0 not equal", func(t *testing.T) { + att1 := ðpb.SignedAggregateAttestationAndProof{ + Message: ðpb.AggregateAttestationAndProof{ + Aggregate: ðpb.Attestation{ + Data: ðpb.AttestationData{ + CommitteeIndex: 123}}}} + att2 := ðpb.SignedAggregateAttestationAndProof{ + Message: ðpb.AggregateAttestationAndProof{ + Aggregate: ðpb.Attestation{ + Data: ðpb.AttestationData{ + CommitteeIndex: 456}}}} + assert.Equal(t, false, attsAreEqual(att1, att2)) + }) + t.Run("Electra equal", func(t *testing.T) { + cb1 := primitives.NewAttestationCommitteeBits() + cb1.SetBitAt(0, true) + att1 := ðpb.SignedAggregateAttestationAndProofElectra{ + Message: ðpb.AggregateAttestationAndProofElectra{ + Aggregate: ðpb.AttestationElectra{ + Data: ðpb.AttestationData{}, + CommitteeBits: cb1, + }}} + cb2 := primitives.NewAttestationCommitteeBits() + cb2.SetBitAt(0, true) + att2 := ðpb.SignedAggregateAttestationAndProofElectra{ + Message: ðpb.AggregateAttestationAndProofElectra{ + Aggregate: ðpb.AttestationElectra{ + Data: ðpb.AttestationData{}, + CommitteeBits: cb2, + }}} + assert.Equal(t, true, attsAreEqual(att1, att2)) + }) + t.Run("Electra not equal", func(t *testing.T) { + cb1 := primitives.NewAttestationCommitteeBits() + cb1.SetBitAt(0, true) + att1 := ðpb.SignedAggregateAttestationAndProofElectra{ + Message: ðpb.AggregateAttestationAndProofElectra{ + Aggregate: ðpb.AttestationElectra{ + Data: ðpb.AttestationData{}, + CommitteeBits: cb1, + }}} + cb2 := primitives.NewAttestationCommitteeBits() + cb2.SetBitAt(1, true) + att2 := ðpb.SignedAggregateAttestationAndProofElectra{ + Message: ðpb.AggregateAttestationAndProofElectra{ + Aggregate: ðpb.AttestationElectra{ + Data: ðpb.AttestationData{}, + CommitteeBits: cb2, + }}} + assert.Equal(t, false, attsAreEqual(att1, att2)) + }) } diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index c983709c52da..167b1c7db856 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -4,14 +4,12 @@ import ( libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" - "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/network/forks" "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/time/slots" @@ -107,7 +105,7 @@ func readFirstChunkedBlock(stream libp2pcore.Stream, tor blockchain.TemporalOrac if err != nil { return nil, err } - blk, err := extractBlockDataType(rpcCtx, tor) + blk, err := extractDataType(types.BlockMap, rpcCtx, tor) if err != nil { return nil, err } @@ -131,7 +129,7 @@ func readResponseChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, if err != nil { return nil, err } - blk, err := extractBlockDataType(rpcCtx, tor) + blk, err := extractDataType(types.BlockMap, rpcCtx, tor) if err != nil { return nil, err } @@ -139,30 +137,6 @@ func readResponseChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, return blk, err } -func extractBlockDataType(digest []byte, tor blockchain.TemporalOracle) (interfaces.ReadOnlySignedBeaconBlock, error) { - if len(digest) == 0 { - bFunc, ok := types.BlockMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] - if !ok { - return nil, errors.New("no block type exists for the genesis fork version.") - } - return bFunc() - } - if len(digest) != forkDigestLength { - return nil, errors.Errorf("invalid digest returned, wanted a length of %d but received %d", forkDigestLength, len(digest)) - } - vRoot := tor.GenesisValidatorsRoot() - for k, blkFunc := range types.BlockMap { - rDigest, err := signing.ComputeForkDigest(k[:], vRoot[:]) - if err != nil { - return nil, err - } - if rDigest == bytesutil.ToBytes4(digest) { - return blkFunc() - } - } - return nil, errors.Wrapf(ErrNoValidDigest, "could not extract block data type, saw digest=%#x, genesis=%v, vr=%#x", digest, tor.GenesisTime(), tor.GenesisValidatorsRoot()) -} - // WriteBlobSidecarChunk writes blob chunk object to stream. // response_chunk ::= | | | func WriteBlobSidecarChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, sidecar blocks.VerifiedROBlob) error { diff --git a/beacon-chain/sync/rpc_chunked_response_test.go b/beacon-chain/sync/rpc_chunked_response_test.go deleted file mode 100644 index 3f5bb62fc92a..000000000000 --- a/beacon-chain/sync/rpc_chunked_response_test.go +++ /dev/null @@ -1,121 +0,0 @@ -package sync - -import ( - "reflect" - "testing" - - "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" - mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing" - "github.com/prysmaticlabs/prysm/v5/config/params" - "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" - "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" - enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" - ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v5/testing/require" -) - -func TestExtractBlockDataType(t *testing.T) { - // Precompute digests - genDigest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, params.BeaconConfig().ZeroHash[:]) - require.NoError(t, err) - altairDigest, err := signing.ComputeForkDigest(params.BeaconConfig().AltairForkVersion, params.BeaconConfig().ZeroHash[:]) - require.NoError(t, err) - bellatrixDigest, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, params.BeaconConfig().ZeroHash[:]) - require.NoError(t, err) - - type args struct { - digest []byte - chain blockchain.ChainInfoFetcher - } - tests := []struct { - name string - args args - want interfaces.ReadOnlySignedBeaconBlock - wantErr bool - }{ - { - name: "no digest", - args: args{ - digest: []byte{}, - chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, - }, - - want: func() interfaces.ReadOnlySignedBeaconBlock { - wsb, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Body: ðpb.BeaconBlockBody{}}}) - require.NoError(t, err) - return wsb - }(), - wantErr: false, - }, - { - name: "invalid digest", - args: args{ - digest: []byte{0x00, 0x01}, - chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, - }, - want: nil, - wantErr: true, - }, - { - name: "non existent digest", - args: args{ - digest: []byte{0x00, 0x01, 0x02, 0x03}, - chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, - }, - want: nil, - wantErr: true, - }, - { - name: "genesis fork version", - args: args{ - digest: genDigest[:], - chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, - }, - want: func() interfaces.ReadOnlySignedBeaconBlock { - wsb, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Body: ðpb.BeaconBlockBody{}}}) - require.NoError(t, err) - return wsb - }(), - wantErr: false, - }, - { - name: "altair fork version", - args: args{ - digest: altairDigest[:], - chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, - }, - want: func() interfaces.ReadOnlySignedBeaconBlock { - wsb, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlockAltair{Block: ðpb.BeaconBlockAltair{Body: ðpb.BeaconBlockBodyAltair{}}}) - require.NoError(t, err) - return wsb - }(), - wantErr: false, - }, - { - name: "bellatrix fork version", - args: args{ - digest: bellatrixDigest[:], - chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, - }, - want: func() interfaces.ReadOnlySignedBeaconBlock { - wsb, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlockBellatrix{Block: ðpb.BeaconBlockBellatrix{Body: ðpb.BeaconBlockBodyBellatrix{ExecutionPayload: &enginev1.ExecutionPayload{}}}}) - require.NoError(t, err) - return wsb - }(), - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := extractBlockDataType(tt.args.digest, tt.args.chain) - if (err != nil) != tt.wantErr { - t.Errorf("extractBlockDataType() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("extractBlockDataType() got = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/beacon-chain/sync/rpc_metadata.go b/beacon-chain/sync/rpc_metadata.go index 9134e74f3096..a9e8cef081b7 100644 --- a/beacon-chain/sync/rpc_metadata.go +++ b/beacon-chain/sync/rpc_metadata.go @@ -7,13 +7,9 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" - "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper" - "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/network/forks" pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata" @@ -112,7 +108,7 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (metadata if err != nil { return nil, err } - msg, err := extractMetaDataType(rpcCtx[:], s.cfg.clock) + msg, err := extractDataType(types.MetaDataMap, rpcCtx[:], s.cfg.clock) if err != nil { return nil, err } @@ -133,27 +129,3 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (metadata } return msg, nil } - -func extractMetaDataType(digest []byte, tor blockchain.TemporalOracle) (metadata.Metadata, error) { - if len(digest) == 0 { - mdFunc, ok := types.MetaDataMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] - if !ok { - return nil, errors.New("no metadata type exists for the genesis fork version.") - } - return mdFunc(), nil - } - if len(digest) != forkDigestLength { - return nil, errors.Errorf("invalid digest returned, wanted a length of %d but received %d", forkDigestLength, len(digest)) - } - vRoot := tor.GenesisValidatorsRoot() - for k, mdFunc := range types.MetaDataMap { - rDigest, err := signing.ComputeForkDigest(k[:], vRoot[:]) - if err != nil { - return nil, err - } - if rDigest == bytesutil.ToBytes4(digest) { - return mdFunc(), nil - } - } - return nil, errors.Wrapf(ErrNoValidDigest, "could not extract metadata type, saw digest=%#x, genesis=%v, vr=%#x", digest, tor.GenesisTime(), tor.GenesisValidatorsRoot()) -} diff --git a/beacon-chain/sync/rpc_metadata_test.go b/beacon-chain/sync/rpc_metadata_test.go index 4f38ff329438..23eb74041f0b 100644 --- a/beacon-chain/sync/rpc_metadata_test.go +++ b/beacon-chain/sync/rpc_metadata_test.go @@ -2,16 +2,13 @@ package sync import ( "context" - "reflect" "sync" "testing" "time" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing" db "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" p2ptest "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" @@ -21,7 +18,6 @@ import ( leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v5/encoding/ssz/equality" pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata" "github.com/prysmaticlabs/prysm/v5/testing/assert" "github.com/prysmaticlabs/prysm/v5/testing/require" "github.com/prysmaticlabs/prysm/v5/testing/util" @@ -233,80 +229,3 @@ func TestMetadataRPCHandler_SendsMetadataAltair(t *testing.T) { t.Error("Peer is disconnected despite receiving a valid ping") } } - -func TestExtractMetaDataType(t *testing.T) { - // Precompute digests - genDigest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, params.BeaconConfig().ZeroHash[:]) - require.NoError(t, err) - altairDigest, err := signing.ComputeForkDigest(params.BeaconConfig().AltairForkVersion, params.BeaconConfig().ZeroHash[:]) - require.NoError(t, err) - - type args struct { - digest []byte - clock blockchain.TemporalOracle - } - tests := []struct { - name string - args args - want metadata.Metadata - wantErr bool - }{ - { - name: "no digest", - args: args{ - digest: []byte{}, - clock: startup.NewClock(time.Now(), [32]byte{}), - }, - want: wrapper.WrappedMetadataV0(&pb.MetaDataV0{}), - wantErr: false, - }, - { - name: "invalid digest", - args: args{ - digest: []byte{0x00, 0x01}, - clock: startup.NewClock(time.Now(), [32]byte{}), - }, - want: nil, - wantErr: true, - }, - { - name: "non existent digest", - args: args{ - digest: []byte{0x00, 0x01, 0x02, 0x03}, - clock: startup.NewClock(time.Now(), [32]byte{}), - }, - want: nil, - wantErr: true, - }, - { - name: "genesis fork version", - args: args{ - digest: genDigest[:], - clock: startup.NewClock(time.Now(), [32]byte{}), - }, - want: wrapper.WrappedMetadataV0(&pb.MetaDataV0{}), - wantErr: false, - }, - { - name: "altair fork version", - args: args{ - digest: altairDigest[:], - clock: startup.NewClock(time.Now(), [32]byte{}), - }, - want: wrapper.WrappedMetadataV1(&pb.MetaDataV1{}), - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := extractMetaDataType(tt.args.digest, tt.args.clock) - if (err != nil) != tt.wantErr { - t.Errorf("extractMetaDataType() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("extractMetaDataType() got = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 9736fc9b994e..15196bf6ca74 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -122,7 +122,7 @@ type Service struct { cancel context.CancelFunc slotToPendingBlocks *gcache.Cache seenPendingBlocks map[[32]byte]bool - blkRootToPendingAtts map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof + blkRootToPendingAtts map[[32]byte][]ethpb.SignedAggregateAttAndProof subHandler *subTopicHandler pendingAttsLock sync.RWMutex pendingQueueLock sync.RWMutex @@ -171,7 +171,7 @@ func NewService(ctx context.Context, opts ...Option) *Service { cfg: &config{clock: startup.NewClock(time.Unix(0, 0), [32]byte{})}, slotToPendingBlocks: c, seenPendingBlocks: make(map[[32]byte]bool), - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), signatureChan: make(chan *signatureVerifier, verifierLimit), } for _, opt := range opts { diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go index 2df43706166d..06380ec948f2 100644 --- a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go @@ -13,19 +13,21 @@ import ( // beaconAggregateProofSubscriber forwards the incoming validated aggregated attestation and proof to the // attestation pool for processing. func (s *Service) beaconAggregateProofSubscriber(_ context.Context, msg proto.Message) error { - a, ok := msg.(*ethpb.SignedAggregateAttestationAndProof) + a, ok := msg.(ethpb.SignedAggregateAttAndProof) if !ok { - return fmt.Errorf("message was not type *ethpb.SignedAggregateAttestationAndProof, type=%T", msg) + return fmt.Errorf("message was not type ethpb.SignedAggregateAttAndProof, type=%T", msg) } - if a.Message.Aggregate == nil || a.Message.Aggregate.Data == nil { + aggregate := a.AggregateAttestationAndProof().AggregateVal() + + if aggregate == nil || aggregate.GetData() == nil { return errors.New("nil aggregate") } // An unaggregated attestation can make it here. It’s valid, the aggregator it just itself, although it means poor performance for the subnet. - if !helpers.IsAggregated(a.Message.Aggregate) { - return s.cfg.attPool.SaveUnaggregatedAttestation(a.Message.Aggregate) + if !helpers.IsAggregated(aggregate) { + return s.cfg.attPool.SaveUnaggregatedAttestation(aggregate) } - return s.cfg.attPool.SaveAggregatedAttestation(a.Message.Aggregate) + return s.cfg.attPool.SaveAggregatedAttestation(aggregate) } diff --git a/beacon-chain/sync/subscriber_beacon_attestation.go b/beacon-chain/sync/subscriber_beacon_attestation.go index 512467392796..ecac1c85291b 100644 --- a/beacon-chain/sync/subscriber_beacon_attestation.go +++ b/beacon-chain/sync/subscriber_beacon_attestation.go @@ -15,19 +15,21 @@ import ( ) func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, msg proto.Message) error { - a, ok := msg.(*eth.Attestation) + a, ok := msg.(eth.Att) if !ok { - return fmt.Errorf("message was not type *eth.Attestation, type=%T", msg) + return fmt.Errorf("message was not type eth.Att, type=%T", msg) } - if a.Data == nil { + data := a.GetData() + + if data == nil { return errors.New("nil attestation") } - s.setSeenCommitteeIndicesSlot(a.Data.Slot, a.Data.CommitteeIndex, a.AggregationBits) + s.setSeenCommitteeIndicesSlot(data.Slot, data.CommitteeIndex, a.GetAggregationBits()) exists, err := s.cfg.attPool.HasAggregatedAttestation(a) if err != nil { - return errors.Wrap(err, "Could not determine if attestation pool has this atttestation") + return errors.Wrap(err, "could not determine if attestation pool has this attestation") } if exists { return nil diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index 893c3670b5cf..414e89db867c 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -20,6 +20,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/runtime/version" prysmTime "github.com/prysmaticlabs/prysm/v5/time" "github.com/prysmaticlabs/prysm/v5/time/slots" "go.opencensus.io/trace" @@ -47,38 +48,50 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms tracing.AnnotateError(span, err) return pubsub.ValidationReject, err } - m, ok := raw.(*ethpb.SignedAggregateAttestationAndProof) + m, ok := raw.(ethpb.SignedAggregateAttAndProof) if !ok { return pubsub.ValidationReject, errors.Errorf("invalid message type: %T", raw) } - if m.Message == nil { + if m.AggregateAttestationAndProof() == nil { return pubsub.ValidationReject, errNilMessage } - if err := helpers.ValidateNilAttestation(m.Message.Aggregate); err != nil { + + aggregate := m.AggregateAttestationAndProof().AggregateVal() + data := aggregate.GetData() + + if err := helpers.ValidateNilAttestation(aggregate); err != nil { return pubsub.ValidationReject, err } // Do not process slot 0 aggregates. - if m.Message.Aggregate.Data.Slot == 0 { + if data.Slot == 0 { return pubsub.ValidationIgnore, nil } // Broadcast the aggregated attestation on a feed to notify other services in the beacon node // of a received aggregated attestation. - s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ - Type: operation.AggregatedAttReceived, - Data: &operation.AggregatedAttReceivedData{ - Attestation: m.Message, - }, - }) - - if err := helpers.ValidateSlotTargetEpoch(m.Message.Aggregate.Data); err != nil { + + // TODO: this will be extended to Electra in a later PR + + if m.Version() == version.Phase0 { + phase0Att, ok := m.(*ethpb.SignedAggregateAttestationAndProof) + if ok { + s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{ + Type: operation.AggregatedAttReceived, + Data: &operation.AggregatedAttReceivedData{ + Attestation: phase0Att.Message, + }, + }) + } + } + + if err := helpers.ValidateSlotTargetEpoch(data); err != nil { return pubsub.ValidationReject, err } // Attestation's slot is within ATTESTATION_PROPAGATION_SLOT_RANGE and early attestation // processing tolerance. if err := helpers.ValidateAttestationTime( - m.Message.Aggregate.Data.Slot, + data.Slot, s.cfg.clock.GenesisTime(), earlyAttestationProcessingTolerance, ); err != nil { @@ -87,19 +100,19 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms } // Verify this is the first aggregate received from the aggregator with index and slot. - if s.hasSeenAggregatorIndexEpoch(m.Message.Aggregate.Data.Target.Epoch, m.Message.AggregatorIndex) { + if s.hasSeenAggregatorIndexEpoch(data.Target.Epoch, m.AggregateAttestationAndProof().GetAggregatorIndex()) { return pubsub.ValidationIgnore, nil } // Check that the block being voted on isn't invalid. - if s.hasBadBlock(bytesutil.ToBytes32(m.Message.Aggregate.Data.BeaconBlockRoot)) || - s.hasBadBlock(bytesutil.ToBytes32(m.Message.Aggregate.Data.Target.Root)) || - s.hasBadBlock(bytesutil.ToBytes32(m.Message.Aggregate.Data.Source.Root)) { + if s.hasBadBlock(bytesutil.ToBytes32(data.BeaconBlockRoot)) || + s.hasBadBlock(bytesutil.ToBytes32(data.Target.Root)) || + s.hasBadBlock(bytesutil.ToBytes32(data.Source.Root)) { attBadBlockCount.Inc() return pubsub.ValidationReject, errors.New("bad block referenced in attestation data") } // Verify aggregate attestation has not already been seen via aggregate gossip, within a block, or through the creation locally. - seen, err := s.cfg.attPool.HasAggregatedAttestation(m.Message.Aggregate) + seen, err := s.cfg.attPool.HasAggregatedAttestation(aggregate) if err != nil { tracing.AnnotateError(span, err) return pubsub.ValidationIgnore, err @@ -116,7 +129,7 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms return validationRes, err } - s.setAggregatorIndexEpochSeen(m.Message.Aggregate.Data.Target.Epoch, m.Message.AggregatorIndex) + s.setAggregatorIndexEpochSeen(data.Target.Epoch, m.AggregateAttestationAndProof().GetAggregatorIndex()) msg.ValidatorData = m @@ -125,44 +138,75 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms return pubsub.ValidationAccept, nil } -func (s *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.SignedAggregateAttestationAndProof) (pubsub.ValidationResult, error) { +func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.SignedAggregateAttAndProof) (pubsub.ValidationResult, error) { ctx, span := trace.StartSpan(ctx, "sync.validateAggregatedAtt") defer span.End() + aggregateAndProof := signed.AggregateAttestationAndProof() + aggregatorIndex := aggregateAndProof.GetAggregatorIndex() + aggregate := aggregateAndProof.AggregateVal() + data := aggregate.GetData() + // Verify attestation target root is consistent with the head root. // This verification is not in the spec, however we guard against it as it opens us up // to weird edge cases during verification. The attestation technically could be used to add value to a block, // but it's invalid in the spirit of the protocol. Here we choose safety over profit. - if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, signed.Message.Aggregate); err != nil { + if err := s.cfg.chain.VerifyLmdFfgConsistency(ctx, aggregate); err != nil { tracing.AnnotateError(span, err) attBadLmdConsistencyCount.Inc() return pubsub.ValidationReject, err } // Verify current finalized checkpoint is an ancestor of the block defined by the attestation's beacon block root. - if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(signed.Message.Aggregate.Data.BeaconBlockRoot)) { + if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) { tracing.AnnotateError(span, blockchain.ErrNotDescendantOfFinalized) return pubsub.ValidationIgnore, blockchain.ErrNotDescendantOfFinalized } - bs, err := s.cfg.chain.AttestationTargetState(ctx, signed.Message.Aggregate.Data.Target) + bs, err := s.cfg.chain.AttestationTargetState(ctx, data.Target) if err != nil { tracing.AnnotateError(span, err) return pubsub.ValidationIgnore, err } // Verify validator index is within the beacon committee. - result, err := s.validateIndexInCommittee(ctx, bs, signed.Message.Aggregate, signed.Message.AggregatorIndex) + result, err := s.validateIndexInCommittee(ctx, bs, aggregate, aggregatorIndex) if result != pubsub.ValidationAccept { wrappedErr := errors.Wrapf(err, "Could not validate index in committee") tracing.AnnotateError(span, wrappedErr) return result, wrappedErr } + var committeeIndex primitives.CommitteeIndex + if signed.Version() >= version.Electra { + a, ok := aggregate.(*ethpb.AttestationElectra) + // This will never fail in practice because we asserted the version + if !ok { + err := errors.New("attestation has wrong type") + tracing.AnnotateError(span, err) + return pubsub.ValidationReject, err + } + committeeIndex, result, err = validateCommitteeIndexElectra(ctx, a) + if result != pubsub.ValidationAccept { + wrappedErr := errors.Wrapf(err, "could not validate committee index for Electra version") + tracing.AnnotateError(span, wrappedErr) + return result, wrappedErr + } + } else { + committeeIndex = data.CommitteeIndex + } + // Verify selection proof reflects to the right validator. - selectionSigSet, err := validateSelectionIndex(ctx, bs, signed.Message.Aggregate.Data, signed.Message.AggregatorIndex, signed.Message.SelectionProof) + selectionSigSet, err := validateSelectionIndex( + ctx, + bs, + data.Slot, + committeeIndex, + aggregatorIndex, + aggregateAndProof.GetSelectionProof(), + ) if err != nil { - wrappedErr := errors.Wrapf(err, "Could not validate selection for validator %d", signed.Message.AggregatorIndex) + wrappedErr := errors.Wrapf(err, "Could not validate selection for validator %d", aggregateAndProof.GetAggregatorIndex()) tracing.AnnotateError(span, wrappedErr) attBadSelectionProofCount.Inc() return pubsub.ValidationReject, wrappedErr @@ -172,13 +216,13 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.Signe // We use batch verify here to save compute. aggregatorSigSet, err := aggSigSet(bs, signed) if err != nil { - wrappedErr := errors.Wrapf(err, "Could not get aggregator sig set %d", signed.Message.AggregatorIndex) + wrappedErr := errors.Wrapf(err, "Could not get aggregator sig set %d", aggregatorIndex) tracing.AnnotateError(span, wrappedErr) return pubsub.ValidationIgnore, wrappedErr } - attSigSet, err := blocks.AttestationSignatureBatch(ctx, bs, []ethpb.Att{signed.Message.Aggregate}) + attSigSet, err := blocks.AttestationSignatureBatch(ctx, bs, []ethpb.Att{aggregate}) if err != nil { - wrappedErr := errors.Wrapf(err, "Could not verify aggregator signature %d", signed.Message.AggregatorIndex) + wrappedErr := errors.Wrapf(err, "Could not verify aggregator signature %d", aggregatorIndex) tracing.AnnotateError(span, wrappedErr) return pubsub.ValidationIgnore, wrappedErr } @@ -188,10 +232,9 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed *ethpb.Signe return s.validateWithBatchVerifier(ctx, "aggregate", set) } -func (s *Service) validateBlockInAttestation(ctx context.Context, satt *ethpb.SignedAggregateAttestationAndProof) bool { - a := satt.Message +func (s *Service) validateBlockInAttestation(ctx context.Context, satt ethpb.SignedAggregateAttAndProof) bool { // Verify the block being voted and the processed state is in beaconDB. The block should have passed validation if it's in the beaconDB. - blockRoot := bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot) + blockRoot := bytesutil.ToBytes32(satt.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot) if !s.hasBlockAndState(ctx, blockRoot) { // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. s.savePendingAtt(satt) @@ -234,7 +277,7 @@ func (s *Service) validateIndexInCommittee(ctx context.Context, bs state.ReadOnl return result, err } - committee, result, err := s.validateBitLength(ctx, a, bs) + committee, result, err := s.validateBitLength(ctx, bs, a.GetData().Slot, a.GetData().CommitteeIndex, a.GetAggregationBits()) if result != pubsub.ValidationAccept { return result, err } @@ -262,14 +305,15 @@ func (s *Service) validateIndexInCommittee(ctx context.Context, bs state.ReadOnl func validateSelectionIndex( ctx context.Context, bs state.ReadOnlyBeaconState, - data *ethpb.AttestationData, + slot primitives.Slot, + committeeIndex primitives.CommitteeIndex, validatorIndex primitives.ValidatorIndex, proof []byte, ) (*bls.SignatureBatch, error) { ctx, span := trace.StartSpan(ctx, "sync.validateSelectionIndex") defer span.End() - committee, err := helpers.BeaconCommitteeFromState(ctx, bs, data.Slot, data.CommitteeIndex) + committee, err := helpers.BeaconCommitteeFromState(ctx, bs, slot, committeeIndex) if err != nil { return nil, err } @@ -278,11 +322,11 @@ func validateSelectionIndex( return nil, err } if !aggregator { - return nil, fmt.Errorf("validator is not an aggregator for slot %d", data.Slot) + return nil, fmt.Errorf("validator is not an aggregator for slot %d", slot) } domain := params.BeaconConfig().DomainSelectionProof - epoch := slots.ToEpoch(data.Slot) + epoch := slots.ToEpoch(slot) v, err := bs.ValidatorAtIndex(validatorIndex) if err != nil { @@ -297,7 +341,7 @@ func validateSelectionIndex( if err != nil { return nil, err } - sszUint := primitives.SSZUint64(data.Slot) + sszUint := primitives.SSZUint64(slot) root, err := signing.ComputeSigningRoot(&sszUint, d) if err != nil { return nil, err @@ -311,8 +355,10 @@ func validateSelectionIndex( } // This returns aggregator signature set which can be used to batch verify. -func aggSigSet(s state.ReadOnlyBeaconState, a *ethpb.SignedAggregateAttestationAndProof) (*bls.SignatureBatch, error) { - v, err := s.ValidatorAtIndex(a.Message.AggregatorIndex) +func aggSigSet(s state.ReadOnlyBeaconState, a ethpb.SignedAggregateAttAndProof) (*bls.SignatureBatch, error) { + aggregateAndProof := a.AggregateAttestationAndProof() + + v, err := s.ValidatorAtIndex(aggregateAndProof.GetAggregatorIndex()) if err != nil { return nil, err } @@ -321,17 +367,17 @@ func aggSigSet(s state.ReadOnlyBeaconState, a *ethpb.SignedAggregateAttestationA return nil, err } - epoch := slots.ToEpoch(a.Message.Aggregate.Data.Slot) + epoch := slots.ToEpoch(aggregateAndProof.AggregateVal().GetData().Slot) d, err := signing.Domain(s.Fork(), epoch, params.BeaconConfig().DomainAggregateAndProof, s.GenesisValidatorsRoot()) if err != nil { return nil, err } - root, err := signing.ComputeSigningRoot(a.Message, d) + root, err := signing.ComputeSigningRoot(aggregateAndProof, d) if err != nil { return nil, err } return &bls.SignatureBatch{ - Signatures: [][]byte{a.Signature}, + Signatures: [][]byte{a.GetSignature()}, PublicKeys: []bls.PublicKey{publicKey}, Messages: [][32]byte{root}, Descriptions: []string{signing.AggregatorSignature}, diff --git a/beacon-chain/sync/validate_aggregate_proof_test.go b/beacon-chain/sync/validate_aggregate_proof_test.go index 1c4da525bf06..a8e5539aed13 100644 --- a/beacon-chain/sync/validate_aggregate_proof_test.go +++ b/beacon-chain/sync/validate_aggregate_proof_test.go @@ -117,7 +117,7 @@ func TestVerifySelection_NotAnAggregator(t *testing.T) { sig := privKeys[0].Sign([]byte{'A'}) data := util.HydrateAttestationData(ðpb.AttestationData{}) - _, err := validateSelectionIndex(ctx, beaconState, data, 0, sig.Marshal()) + _, err := validateSelectionIndex(ctx, beaconState, data.Slot, data.CommitteeIndex, 0, sig.Marshal()) wanted := "validator is not an aggregator for slot" assert.ErrorContains(t, wanted, err) } @@ -149,7 +149,7 @@ func TestValidateAggregateAndProof_NoBlock(t *testing.T) { attPool: attestations.NewPool(), chain: &mock.ChainService{}, }, - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), seenAggregatedAttestationCache: c, } r.initCaches() @@ -302,7 +302,7 @@ func TestValidateAggregateAndProof_ExistedInPool(t *testing.T) { attestationNotifier: (&mock.ChainService{}).OperationNotifier(), }, seenAggregatedAttestationCache: lruwrpr.New(10), - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), } r.initCaches() diff --git a/beacon-chain/sync/validate_beacon_attestation.go b/beacon-chain/sync/validate_beacon_attestation.go index c37abab2191a..2b7e4a964dc8 100644 --- a/beacon-chain/sync/validate_beacon_attestation.go +++ b/beacon-chain/sync/validate_beacon_attestation.go @@ -9,6 +9,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" + "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" @@ -22,6 +23,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/monitoring/tracing" eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation" + "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/time/slots" "go.opencensus.io/trace" ) @@ -33,6 +35,8 @@ import ( // - attestation.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (attestation.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= attestation.data.slot). // - The signature of attestation is valid. func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { + var validationRes pubsub.ValidationResult + if pid == s.cfg.p2p.PeerID() { return pubsub.ValidationAccept, nil } @@ -55,16 +59,18 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p return pubsub.ValidationReject, err } - att, ok := m.(*eth.Attestation) + att, ok := m.(eth.Att) if !ok { return pubsub.ValidationReject, errWrongMessage } + data := att.GetData() + if err := helpers.ValidateNilAttestation(att); err != nil { return pubsub.ValidationReject, err } // Do not process slot 0 attestations. - if att.Data.Slot == 0 { + if data.Slot == 0 { return pubsub.ValidationIgnore, nil } // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node @@ -78,15 +84,34 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p // Attestation's slot is within ATTESTATION_PROPAGATION_SLOT_RANGE and early attestation // processing tolerance. - if err := helpers.ValidateAttestationTime(att.Data.Slot, s.cfg.clock.GenesisTime(), + if err := helpers.ValidateAttestationTime(data.Slot, s.cfg.clock.GenesisTime(), earlyAttestationProcessingTolerance); err != nil { tracing.AnnotateError(span, err) return pubsub.ValidationIgnore, err } - if err := helpers.ValidateSlotTargetEpoch(att.Data); err != nil { + if err := helpers.ValidateSlotTargetEpoch(data); err != nil { return pubsub.ValidationReject, err } + var committeeIndex primitives.CommitteeIndex + if att.Version() >= version.Electra { + a, ok := att.(*eth.AttestationElectra) + // This will never fail in practice because we asserted the version + if !ok { + err := errors.New("attestation has wrong type") + tracing.AnnotateError(span, err) + return pubsub.ValidationReject, err + } + committeeIndex, validationRes, err = validateCommitteeIndexElectra(ctx, a) + if validationRes != pubsub.ValidationAccept { + wrappedErr := errors.Wrapf(err, "could not validate committee index for Electra version") + tracing.AnnotateError(span, wrappedErr) + return validationRes, wrappedErr + } + } else { + committeeIndex = data.CommitteeIndex + } + if features.Get().EnableSlasher { // Feed the indexed attestation to slasher if enabled. This action // is done in the background to avoid adding more load to this critical code path. @@ -94,13 +119,13 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p // Using a different context to prevent timeouts as this operation can be expensive // and we want to avoid affecting the critical code path. ctx := context.TODO() - preState, err := s.cfg.chain.AttestationTargetState(ctx, att.Data.Target) + preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target) if err != nil { log.WithError(err).Error("Could not retrieve pre state") tracing.AnnotateError(span, err) return } - committee, err := helpers.BeaconCommitteeFromState(ctx, preState, att.Data.Slot, att.Data.CommitteeIndex) + committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, committeeIndex) if err != nil { log.WithError(err).Error("Could not get attestation committee") tracing.AnnotateError(span, err) @@ -117,27 +142,41 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p } // Verify this the first attestation received for the participating validator for the slot. - if s.hasSeenCommitteeIndicesSlot(att.Data.Slot, att.Data.CommitteeIndex, att.AggregationBits) { + if s.hasSeenCommitteeIndicesSlot(data.Slot, data.CommitteeIndex, att.GetAggregationBits()) { return pubsub.ValidationIgnore, nil } // Reject an attestation if it references an invalid block. - if s.hasBadBlock(bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) || - s.hasBadBlock(bytesutil.ToBytes32(att.Data.Target.Root)) || - s.hasBadBlock(bytesutil.ToBytes32(att.Data.Source.Root)) { + if s.hasBadBlock(bytesutil.ToBytes32(data.BeaconBlockRoot)) || + s.hasBadBlock(bytesutil.ToBytes32(data.Target.Root)) || + s.hasBadBlock(bytesutil.ToBytes32(data.Source.Root)) { attBadBlockCount.Inc() return pubsub.ValidationReject, errors.New("attestation data references bad block root") } // Verify the block being voted and the processed state is in beaconDB and the block has passed validation if it's in the beaconDB. - blockRoot := bytesutil.ToBytes32(att.Data.BeaconBlockRoot) + blockRoot := bytesutil.ToBytes32(data.BeaconBlockRoot) if !s.hasBlockAndState(ctx, blockRoot) { // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. - s.savePendingAtt(ð.SignedAggregateAttestationAndProof{Message: ð.AggregateAttestationAndProof{Aggregate: att}}) + if att.Version() >= version.Electra { + a, ok := att.(*eth.AttestationElectra) + // This will never fail in practice because we asserted the version + if !ok { + return pubsub.ValidationReject, fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.AttestationElectra{}, att) + } + s.savePendingAtt(ð.SignedAggregateAttestationAndProofElectra{Message: ð.AggregateAttestationAndProofElectra{Aggregate: a}}) + } else { + a, ok := att.(*eth.Attestation) + // This will never fail in practice because we asserted the version + if !ok { + return pubsub.ValidationReject, fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.Attestation{}, att) + } + s.savePendingAtt(ð.SignedAggregateAttestationAndProof{Message: ð.AggregateAttestationAndProof{Aggregate: a}}) + } return pubsub.ValidationIgnore, nil } - if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) { + if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) { tracing.AnnotateError(span, blockchain.ErrNotDescendantOfFinalized) return pubsub.ValidationIgnore, blockchain.ErrNotDescendantOfFinalized } @@ -147,13 +186,13 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p return pubsub.ValidationReject, err } - preState, err := s.cfg.chain.AttestationTargetState(ctx, att.Data.Target) + preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target) if err != nil { tracing.AnnotateError(span, err) return pubsub.ValidationIgnore, err } - validationRes, err := s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic) + validationRes, err = s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic) if validationRes != pubsub.ValidationAccept { return validationRes, err } @@ -163,7 +202,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p return validationRes, err } - s.setSeenCommitteeIndicesSlot(att.Data.Slot, att.Data.CommitteeIndex, att.AggregationBits) + s.setSeenCommitteeIndicesSlot(data.Slot, data.CommitteeIndex, att.GetAggregationBits()) msg.ValidatorData = att @@ -211,7 +250,7 @@ func (s *Service) validateUnaggregatedAttWithState(ctx context.Context, a eth.At ctx, span := trace.StartSpan(ctx, "sync.validateUnaggregatedAttWithState") defer span.End() - committee, result, err := s.validateBitLength(ctx, a, bs) + committee, result, err := s.validateBitLength(ctx, bs, a.GetData().Slot, a.GetData().CommitteeIndex, a.GetAggregationBits()) if result != pubsub.ValidationAccept { return result, err } @@ -232,14 +271,20 @@ func (s *Service) validateUnaggregatedAttWithState(ctx context.Context, a eth.At return s.validateWithBatchVerifier(ctx, "attestation", set) } -func (s *Service) validateBitLength(ctx context.Context, a eth.Att, bs state.ReadOnlyBeaconState) ([]primitives.ValidatorIndex, pubsub.ValidationResult, error) { - committee, err := helpers.BeaconCommitteeFromState(ctx, bs, a.GetData().Slot, a.GetData().CommitteeIndex) +func (s *Service) validateBitLength( + ctx context.Context, + bs state.ReadOnlyBeaconState, + slot primitives.Slot, + committeeIndex primitives.CommitteeIndex, + aggregationBits bitfield.Bitlist, +) ([]primitives.ValidatorIndex, pubsub.ValidationResult, error) { + committee, err := helpers.BeaconCommitteeFromState(ctx, bs, slot, committeeIndex) if err != nil { return nil, pubsub.ValidationIgnore, err } // Verify number of aggregation bits matches the committee size. - if err := helpers.VerifyBitfieldLength(a.GetAggregationBits(), uint64(len(committee))); err != nil { + if err := helpers.VerifyBitfieldLength(aggregationBits, uint64(len(committee))); err != nil { return nil, pubsub.ValidationReject, err } diff --git a/beacon-chain/sync/validate_beacon_attestation_electra.go b/beacon-chain/sync/validate_beacon_attestation_electra.go new file mode 100644 index 000000000000..3ed3fe9c1899 --- /dev/null +++ b/beacon-chain/sync/validate_beacon_attestation_electra.go @@ -0,0 +1,27 @@ +package sync + +import ( + "context" + "fmt" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "go.opencensus.io/trace" +) + +func validateCommitteeIndexElectra(ctx context.Context, a *ethpb.AttestationElectra) (primitives.CommitteeIndex, pubsub.ValidationResult, error) { + _, span := trace.StartSpan(ctx, "sync.validateCommitteeIndexElectra") + defer span.End() + + ci := a.Data.CommitteeIndex + if ci != 0 { + return 0, pubsub.ValidationReject, fmt.Errorf("committee index must be 0 but was %d", ci) + } + committeeIndices := helpers.CommitteeIndices(a.CommitteeBits) + if len(committeeIndices) != 1 { + return 0, pubsub.ValidationReject, fmt.Errorf("exactly 1 committee index must be set but %d were set", len(committeeIndices)) + } + return committeeIndices[0], pubsub.ValidationAccept, nil +} diff --git a/beacon-chain/sync/validate_beacon_attestation_electra_test.go b/beacon-chain/sync/validate_beacon_attestation_electra_test.go new file mode 100644 index 000000000000..81c8e2e9dff9 --- /dev/null +++ b/beacon-chain/sync/validate_beacon_attestation_electra_test.go @@ -0,0 +1,46 @@ +package sync + +import ( + "context" + "testing" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/testing/assert" + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +func Test_validateCommitteeIndexElectra(t *testing.T) { + ctx := context.Background() + + t.Run("valid", func(t *testing.T) { + cb := primitives.NewAttestationCommitteeBits() + cb.SetBitAt(1, true) + ci, res, err := validateCommitteeIndexElectra(ctx, ðpb.AttestationElectra{Data: ðpb.AttestationData{}, CommitteeBits: cb}) + require.NoError(t, err) + assert.Equal(t, pubsub.ValidationAccept, res) + assert.Equal(t, primitives.CommitteeIndex(1), ci) + }) + t.Run("non-zero data committee index", func(t *testing.T) { + cb := primitives.NewAttestationCommitteeBits() + cb.SetBitAt(1, true) + _, res, err := validateCommitteeIndexElectra(ctx, ðpb.AttestationElectra{Data: ðpb.AttestationData{CommitteeIndex: 1}, CommitteeBits: cb}) + assert.NotNil(t, err) + assert.Equal(t, pubsub.ValidationReject, res) + }) + t.Run("no committee bits set", func(t *testing.T) { + cb := primitives.NewAttestationCommitteeBits() + _, res, err := validateCommitteeIndexElectra(ctx, ðpb.AttestationElectra{Data: ðpb.AttestationData{}, CommitteeBits: cb}) + assert.NotNil(t, err) + assert.Equal(t, pubsub.ValidationReject, res) + }) + t.Run("more than 1 committee bit set", func(t *testing.T) { + cb := primitives.NewAttestationCommitteeBits() + cb.SetBitAt(0, true) + cb.SetBitAt(1, true) + _, res, err := validateCommitteeIndexElectra(ctx, ðpb.AttestationElectra{Data: ðpb.AttestationData{}, CommitteeBits: cb}) + assert.NotNil(t, err) + assert.Equal(t, pubsub.ValidationReject, res) + }) +} diff --git a/beacon-chain/sync/validate_beacon_attestation_test.go b/beacon-chain/sync/validate_beacon_attestation_test.go index 2db075aa2d28..2451352fb421 100644 --- a/beacon-chain/sync/validate_beacon_attestation_test.go +++ b/beacon-chain/sync/validate_beacon_attestation_test.go @@ -49,7 +49,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), attestationNotifier: (&mockChain.ChainService{}).OperationNotifier(), }, - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), seenUnAggregatedAttestationCache: lruwrpr.New(10), signatureChan: make(chan *signatureVerifier, verifierLimit), } @@ -290,7 +290,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { m.Message.Topic = nil } - res, err := s.validateCommitteeIndexBeaconAttestation(ctx, "" /*peerID*/, m) + res, err := s.validateCommitteeIndexBeaconAttestation(ctx, "", m) received := res == pubsub.ValidationAccept if received != tt.want { t.Fatalf("Did not received wanted validation. Got %v, wanted %v", !tt.want, tt.want) From ebe165ba5997202d59cf08a95244ee91fc3ed14b Mon Sep 17 00:00:00 2001 From: rkapka Date: Thu, 6 Jun 2024 15:50:39 +0200 Subject: [PATCH 02/10] small cleanup --- beacon-chain/sync/validate_aggregate_proof.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index 414e89db867c..fde3b718c4db 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -69,9 +69,7 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms // Broadcast the aggregated attestation on a feed to notify other services in the beacon node // of a received aggregated attestation. - // TODO: this will be extended to Electra in a later PR - if m.Version() == version.Phase0 { phase0Att, ok := m.(*ethpb.SignedAggregateAttestationAndProof) if ok { @@ -172,7 +170,7 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed // Verify validator index is within the beacon committee. result, err := s.validateIndexInCommittee(ctx, bs, aggregate, aggregatorIndex) if result != pubsub.ValidationAccept { - wrappedErr := errors.Wrapf(err, "Could not validate index in committee") + wrappedErr := errors.Wrapf(err, "could not validate index in committee") tracing.AnnotateError(span, wrappedErr) return result, wrappedErr } @@ -206,7 +204,7 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed aggregateAndProof.GetSelectionProof(), ) if err != nil { - wrappedErr := errors.Wrapf(err, "Could not validate selection for validator %d", aggregateAndProof.GetAggregatorIndex()) + wrappedErr := errors.Wrapf(err, "could not validate selection for validator %d", aggregateAndProof.GetAggregatorIndex()) tracing.AnnotateError(span, wrappedErr) attBadSelectionProofCount.Inc() return pubsub.ValidationReject, wrappedErr @@ -216,13 +214,13 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed // We use batch verify here to save compute. aggregatorSigSet, err := aggSigSet(bs, signed) if err != nil { - wrappedErr := errors.Wrapf(err, "Could not get aggregator sig set %d", aggregatorIndex) + wrappedErr := errors.Wrapf(err, "could not get aggregator sig set %d", aggregatorIndex) tracing.AnnotateError(span, wrappedErr) return pubsub.ValidationIgnore, wrappedErr } attSigSet, err := blocks.AttestationSignatureBatch(ctx, bs, []ethpb.Att{aggregate}) if err != nil { - wrappedErr := errors.Wrapf(err, "Could not verify aggregator signature %d", aggregatorIndex) + wrappedErr := errors.Wrapf(err, "could not verify aggregator signature %d", aggregatorIndex) tracing.AnnotateError(span, wrappedErr) return pubsub.ValidationIgnore, wrappedErr } From a1b20c7fb5a53cefa2ed71c0a1364e78343db41b Mon Sep 17 00:00:00 2001 From: rkapka Date: Thu, 6 Jun 2024 15:58:31 +0200 Subject: [PATCH 03/10] fuzz fix --- beacon-chain/sync/fuzz_exports.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/fuzz_exports.go b/beacon-chain/sync/fuzz_exports.go index ca091afe0129..b98ded988a0f 100644 --- a/beacon-chain/sync/fuzz_exports.go +++ b/beacon-chain/sync/fuzz_exports.go @@ -22,7 +22,7 @@ func NewRegularSyncFuzz(opts ...Option) *Service { cancel: cancel, slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool), - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), } r.rateLimiter = newRateLimiter(r.cfg.p2p) From 2d57082a00a98a9f9aa6a8c29bb355457725e516 Mon Sep 17 00:00:00 2001 From: rkapka Date: Thu, 6 Jun 2024 16:17:32 +0200 Subject: [PATCH 04/10] deepsource --- beacon-chain/sync/decode_pubsub.go | 2 +- beacon-chain/sync/subscriber_beacon_attestation.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index 57661f527106..d8d0e1069547 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -81,7 +81,7 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err } // Replaces our fork digest with the formatter. -func (_ *Service) replaceForkDigest(topic string) (string, error) { +func (*Service) replaceForkDigest(topic string) (string, error) { subStrings := strings.Split(topic, "/") if len(subStrings) != 4 { return "", errInvalidTopic diff --git a/beacon-chain/sync/subscriber_beacon_attestation.go b/beacon-chain/sync/subscriber_beacon_attestation.go index ecac1c85291b..d6d280c38811 100644 --- a/beacon-chain/sync/subscriber_beacon_attestation.go +++ b/beacon-chain/sync/subscriber_beacon_attestation.go @@ -38,11 +38,11 @@ func (s *Service) committeeIndexBeaconAttestationSubscriber(_ context.Context, m return s.cfg.attPool.SaveUnaggregatedAttestation(a) } -func (_ *Service) persistentSubnetIndices() []uint64 { +func (*Service) persistentSubnetIndices() []uint64 { return cache.SubnetIDs.GetAllSubnets() } -func (_ *Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { +func (*Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 { endEpoch := slots.ToEpoch(currentSlot) + 1 endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch)) var commIds []uint64 @@ -52,7 +52,7 @@ func (_ *Service) aggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 return slice.SetUint64(commIds) } -func (_ *Service) attesterSubnetIndices(currentSlot primitives.Slot) []uint64 { +func (*Service) attesterSubnetIndices(currentSlot primitives.Slot) []uint64 { endEpoch := slots.ToEpoch(currentSlot) + 1 endSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(endEpoch)) var commIds []uint64 From e4b2789a2c8dba9adf5d25f84ba3b0049c0a872e Mon Sep 17 00:00:00 2001 From: rkapka Date: Mon, 17 Jun 2024 13:01:24 +0200 Subject: [PATCH 05/10] review --- beacon-chain/p2p/types/BUILD.bazel | 1 + beacon-chain/p2p/types/object_mapping_test.go | 32 ++++++++++++++++--- beacon-chain/sync/decode_pubsub.go | 11 ++++--- .../sync/pending_attestations_queue_test.go | 2 +- beacon-chain/sync/validate_aggregate_proof.go | 4 +-- .../sync/validate_beacon_attestation.go | 6 ++-- 6 files changed, 41 insertions(+), 15 deletions(-) diff --git a/beacon-chain/p2p/types/BUILD.bazel b/beacon-chain/p2p/types/BUILD.bazel index 2934b1413ba6..a66fcefd9e09 100644 --- a/beacon-chain/p2p/types/BUILD.bazel +++ b/beacon-chain/p2p/types/BUILD.bazel @@ -43,6 +43,7 @@ go_test( "//consensus-types/primitives:go_default_library", "//encoding/bytesutil:go_default_library", "//proto/prysm/v1alpha1:go_default_library", + "//runtime/version:go_default_library", "//testing/assert:go_default_library", "//testing/require:go_default_library", "@com_github_prysmaticlabs_fastssz//:go_default_library", diff --git a/beacon-chain/p2p/types/object_mapping_test.go b/beacon-chain/p2p/types/object_mapping_test.go index 119c320d280f..42a186e61b1c 100644 --- a/beacon-chain/p2p/types/object_mapping_test.go +++ b/beacon-chain/p2p/types/object_mapping_test.go @@ -5,7 +5,9 @@ import ( "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" + "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/testing/assert" + "github.com/prysmaticlabs/prysm/v5/testing/require" ) func TestInitializeDataMaps(t *testing.T) { @@ -44,14 +46,36 @@ func TestInitializeDataMaps(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.action() - _, ok := BlockMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] + bFunc, ok := BlockMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] assert.Equal(t, tt.exists, ok) - _, ok = MetaDataMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] + if tt.exists { + b, err := bFunc() + require.NoError(t, err) + generic, err := b.PbGenericBlock() + require.NoError(t, err) + assert.NotNil(t, generic.GetPhase0()) + } + mdFunc, ok := MetaDataMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] + if tt.exists { + md, err := mdFunc() + require.NoError(t, err) + assert.NotNil(t, md.MetadataObjV0()) + } assert.Equal(t, tt.exists, ok) - _, ok = AttestationMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] + attFunc, ok := AttestationMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] + if tt.exists { + att, err := attFunc() + require.NoError(t, err) + assert.Equal(t, version.Phase0, att.Version()) + } assert.Equal(t, tt.exists, ok) - _, ok = AggregateAttestationMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] + aggFunc, ok := AggregateAttestationMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)] assert.Equal(t, tt.exists, ok) + if tt.exists { + agg, err := aggFunc() + require.NoError(t, err) + assert.Equal(t, version.Phase0, agg.Version()) + } }) } } diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index d8d0e1069547..5a986935e604 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -56,23 +56,24 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err return nil, errors.Errorf("message of %T does not support marshaller interface", base) } // Handle different message types across forks. - if topic == p2p.BlockSubnetTopicFormat { + switch topic { + case p2p.BlockSubnetTopicFormat: m, err = extractDataType(types.BlockMap, fDigest[:], s.cfg.clock) if err != nil { return nil, err } - } - if topic == p2p.AttestationSubnetTopicFormat { + case p2p.AttestationSubnetTopicFormat: m, err = extractDataType(types.AttestationMap, fDigest[:], s.cfg.clock) if err != nil { return nil, err } - } - if topic == p2p.AggregateAndProofSubnetTopicFormat { + case p2p.AggregateAndProofSubnetTopicFormat: m, err = extractDataType(types.AggregateAttestationMap, fDigest[:], s.cfg.clock) if err != nil { return nil, err } + default: + return nil, fmt.Errorf("topic %s is not supported", topic) } if err := s.cfg.p2p.Encoding().DecodeGossip(msg.Data, m); err != nil { return nil, err diff --git a/beacon-chain/sync/pending_attestations_queue_test.go b/beacon-chain/sync/pending_attestations_queue_test.go index 42252d03acd0..6a7e7077a116 100644 --- a/beacon-chain/sync/pending_attestations_queue_test.go +++ b/beacon-chain/sync/pending_attestations_queue_test.go @@ -459,7 +459,7 @@ func TestSavePendingAtts_BeyondLimit(t *testing.T) { assert.Equal(t, 0, len(s.blkRootToPendingAtts[r2]), "Saved pending atts") } -func Test_attsAreEqual_committee(t *testing.T) { +func Test_attsAreEqual_Committee(t *testing.T) { t.Run("Phase 0 equal", func(t *testing.T) { att1 := ðpb.SignedAggregateAttestationAndProof{ Message: ðpb.AggregateAttestationAndProof{ diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index fde3b718c4db..8c753b6eb7d8 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -180,9 +180,9 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed a, ok := aggregate.(*ethpb.AttestationElectra) // This will never fail in practice because we asserted the version if !ok { - err := errors.New("attestation has wrong type") + err := fmt.Errorf("aggregate attestation has wrong type (expected %T, got %T)", ðpb.AttestationElectra{}, aggregate) tracing.AnnotateError(span, err) - return pubsub.ValidationReject, err + return pubsub.ValidationIgnore, err } committeeIndex, result, err = validateCommitteeIndexElectra(ctx, a) if result != pubsub.ValidationAccept { diff --git a/beacon-chain/sync/validate_beacon_attestation.go b/beacon-chain/sync/validate_beacon_attestation.go index 2b7e4a964dc8..8df870d95c64 100644 --- a/beacon-chain/sync/validate_beacon_attestation.go +++ b/beacon-chain/sync/validate_beacon_attestation.go @@ -98,9 +98,9 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p a, ok := att.(*eth.AttestationElectra) // This will never fail in practice because we asserted the version if !ok { - err := errors.New("attestation has wrong type") + err := fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.AttestationElectra{}, att) tracing.AnnotateError(span, err) - return pubsub.ValidationReject, err + return pubsub.ValidationIgnore, err } committeeIndex, validationRes, err = validateCommitteeIndexElectra(ctx, a) if validationRes != pubsub.ValidationAccept { @@ -169,7 +169,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p a, ok := att.(*eth.Attestation) // This will never fail in practice because we asserted the version if !ok { - return pubsub.ValidationReject, fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.Attestation{}, att) + return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.Attestation{}, att) } s.savePendingAtt(ð.SignedAggregateAttestationAndProof{Message: ð.AggregateAttestationAndProof{Aggregate: a}}) } From 71b3d0506968d33838d942e961c7f53f3502b09f Mon Sep 17 00:00:00 2001 From: rkapka Date: Mon, 17 Jun 2024 13:18:02 +0200 Subject: [PATCH 06/10] fix ineffectual assignment --- beacon-chain/sync/decode_pubsub.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index 5a986935e604..f24479c9f5f7 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -51,10 +51,11 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err if base == nil { return nil, p2p.ErrMessageNotMapped } - m, ok := proto.Clone(base).(ssz.Unmarshaler) + _, ok := proto.Clone(base).(ssz.Unmarshaler) if !ok { return nil, errors.Errorf("message of %T does not support marshaller interface", base) } + var m ssz.Unmarshaler // Handle different message types across forks. switch topic { case p2p.BlockSubnetTopicFormat: From 7a599058b5b32712a4fec42a4b88da1c91daef60 Mon Sep 17 00:00:00 2001 From: rkapka Date: Mon, 17 Jun 2024 15:44:03 +0200 Subject: [PATCH 07/10] fix pubsub --- beacon-chain/sync/decode_pubsub.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index f24479c9f5f7..5bacce4a9dad 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -51,11 +51,10 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err if base == nil { return nil, p2p.ErrMessageNotMapped } - _, ok := proto.Clone(base).(ssz.Unmarshaler) + m, ok := proto.Clone(base).(ssz.Unmarshaler) if !ok { return nil, errors.Errorf("message of %T does not support marshaller interface", base) } - var m ssz.Unmarshaler // Handle different message types across forks. switch topic { case p2p.BlockSubnetTopicFormat: @@ -73,8 +72,6 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err if err != nil { return nil, err } - default: - return nil, fmt.Errorf("topic %s is not supported", topic) } if err := s.cfg.p2p.Encoding().DecodeGossip(msg.Data, m); err != nil { return nil, err From 2ec7e96781ef0c89e2e28413fbae9469ce476ca4 Mon Sep 17 00:00:00 2001 From: rkapka Date: Tue, 18 Jun 2024 14:50:20 +0200 Subject: [PATCH 08/10] update ComputeSubnetForAttestation --- beacon-chain/core/helpers/attestation.go | 9 ++++ beacon-chain/core/helpers/attestation_test.go | 44 +++++++++++++------ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/beacon-chain/core/helpers/attestation.go b/beacon-chain/core/helpers/attestation.go index 0e9baf993235..807743e4f893 100644 --- a/beacon-chain/core/helpers/attestation.go +++ b/beacon-chain/core/helpers/attestation.go @@ -10,6 +10,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/crypto/hash" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/runtime/version" prysmTime "github.com/prysmaticlabs/prysm/v5/time" "github.com/prysmaticlabs/prysm/v5/time/slots" ) @@ -91,6 +92,14 @@ func IsAggregated(attestation ethpb.Att) bool { // // return uint64((committees_since_epoch_start + committee_index) % ATTESTATION_SUBNET_COUNT) func ComputeSubnetForAttestation(activeValCount uint64, att ethpb.Att) uint64 { + if att.Version() >= version.Electra { + committeeIndex := 0 + committeeIndices := att.CommitteeBitsVal().BitIndices() + if len(committeeIndices) > 0 { + committeeIndex = committeeIndices[0] + } + return ComputeSubnetFromCommitteeAndSlot(activeValCount, primitives.CommitteeIndex(committeeIndex), att.GetData().Slot) + } return ComputeSubnetFromCommitteeAndSlot(activeValCount, att.GetData().CommitteeIndex, att.GetData().Slot) } diff --git a/beacon-chain/core/helpers/attestation_test.go b/beacon-chain/core/helpers/attestation_test.go index e4a84ae42233..7004c9510aa2 100644 --- a/beacon-chain/core/helpers/attestation_test.go +++ b/beacon-chain/core/helpers/attestation_test.go @@ -73,21 +73,37 @@ func TestAttestation_ComputeSubnetForAttestation(t *testing.T) { RandaoMixes: make([][]byte, params.BeaconConfig().EpochsPerHistoricalVector), }) require.NoError(t, err) - att := ðpb.Attestation{ - AggregationBits: []byte{'A'}, - Data: ðpb.AttestationData{ - Slot: 34, - CommitteeIndex: 4, - BeaconBlockRoot: []byte{'C'}, - Source: nil, - Target: nil, - }, - Signature: []byte{'B'}, - } - valCount, err := helpers.ActiveValidatorCount(context.Background(), state, slots.ToEpoch(att.Data.Slot)) + valCount, err := helpers.ActiveValidatorCount(context.Background(), state, slots.ToEpoch(34)) require.NoError(t, err) - sub := helpers.ComputeSubnetForAttestation(valCount, att) - assert.Equal(t, uint64(6), sub, "Did not get correct subnet for attestation") + + t.Run("Phase 0", func(t *testing.T) { + att := ðpb.Attestation{ + AggregationBits: []byte{'A'}, + Data: ðpb.AttestationData{ + Slot: 34, + CommitteeIndex: 4, + BeaconBlockRoot: []byte{'C'}, + }, + Signature: []byte{'B'}, + } + sub := helpers.ComputeSubnetForAttestation(valCount, att) + assert.Equal(t, uint64(6), sub, "Did not get correct subnet for attestation") + }) + t.Run("Electra", func(t *testing.T) { + cb := primitives.NewAttestationCommitteeBits() + cb.SetBitAt(4, true) + att := ðpb.AttestationElectra{ + AggregationBits: []byte{'A'}, + CommitteeBits: cb, + Data: ðpb.AttestationData{ + Slot: 34, + BeaconBlockRoot: []byte{'C'}, + }, + Signature: []byte{'B'}, + } + sub := helpers.ComputeSubnetForAttestation(valCount, att) + assert.Equal(t, uint64(6), sub, "Did not get correct subnet for attestation") + }) } func Test_ValidateAttestationTime(t *testing.T) { From 26ab15493d7e95c17d7509c81802f2dc25cef3f2 Mon Sep 17 00:00:00 2001 From: rkapka Date: Thu, 20 Jun 2024 14:12:03 +0200 Subject: [PATCH 09/10] review --- beacon-chain/sync/decode_pubsub.go | 37 ++++++++++++----------- beacon-chain/sync/decode_pubsub_test.go | 6 ++-- beacon-chain/sync/rpc_chunked_response.go | 4 +-- beacon-chain/sync/rpc_metadata.go | 2 +- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index 5bacce4a9dad..3c05b871cbb4 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" @@ -56,22 +57,12 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err return nil, errors.Errorf("message of %T does not support marshaller interface", base) } // Handle different message types across forks. - switch topic { - case p2p.BlockSubnetTopicFormat: - m, err = extractDataType(types.BlockMap, fDigest[:], s.cfg.clock) - if err != nil { - return nil, err - } - case p2p.AttestationSubnetTopicFormat: - m, err = extractDataType(types.AttestationMap, fDigest[:], s.cfg.clock) - if err != nil { - return nil, err - } - case p2p.AggregateAndProofSubnetTopicFormat: - m, err = extractDataType(types.AggregateAttestationMap, fDigest[:], s.cfg.clock) - if err != nil { - return nil, err - } + dt, err := extractDataTypeFromTopic(topic, fDigest[:], s.cfg.clock) + if err != nil { + return nil, err + } + if dt != nil { + m = dt } if err := s.cfg.p2p.Encoding().DecodeGossip(msg.Data, m); err != nil { return nil, err @@ -89,7 +80,19 @@ func (*Service) replaceForkDigest(topic string) (string, error) { return strings.Join(subStrings, "/"), nil } -func extractDataType[T any](typeMap map[[4]byte]func() (T, error), digest []byte, tor blockchain.TemporalOracle) (T, error) { +func extractDataTypeFromTopic(topic string, digest []byte, clock *startup.Clock) (ssz.Unmarshaler, error) { + switch topic { + case p2p.BlockSubnetTopicFormat: + return extractDataTypeFromTypeMap(types.BlockMap, digest, clock) + case p2p.AttestationSubnetTopicFormat: + return extractDataTypeFromTypeMap(types.AttestationMap, digest, clock) + case p2p.AggregateAndProofSubnetTopicFormat: + return extractDataTypeFromTypeMap(types.AggregateAttestationMap, digest, clock) + } + return nil, nil +} + +func extractDataTypeFromTypeMap[T any](typeMap map[[4]byte]func() (T, error), digest []byte, tor blockchain.TemporalOracle) (T, error) { var zero T if len(digest) == 0 { diff --git a/beacon-chain/sync/decode_pubsub_test.go b/beacon-chain/sync/decode_pubsub_test.go index cb22ece372b7..1638a2305890 100644 --- a/beacon-chain/sync/decode_pubsub_test.go +++ b/beacon-chain/sync/decode_pubsub_test.go @@ -281,7 +281,7 @@ func TestExtractDataType(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotBlock, err := extractDataType(types.BlockMap, tt.args.digest, tt.args.chain) + gotBlock, err := extractDataTypeFromTypeMap(types.BlockMap, tt.args.digest, tt.args.chain) if (err != nil) != tt.wantErr { t.Errorf("block: error = %v, wantErr %v", err, tt.wantErr) return @@ -289,7 +289,7 @@ func TestExtractDataType(t *testing.T) { if !reflect.DeepEqual(gotBlock, tt.wantBlock) { t.Errorf("block: got = %v, want %v", gotBlock, tt.wantBlock) } - gotAtt, err := extractDataType(types.AttestationMap, tt.args.digest, tt.args.chain) + gotAtt, err := extractDataTypeFromTypeMap(types.AttestationMap, tt.args.digest, tt.args.chain) if (err != nil) != tt.wantErr { t.Errorf("attestation: error = %v, wantErr %v", err, tt.wantErr) return @@ -297,7 +297,7 @@ func TestExtractDataType(t *testing.T) { if !reflect.DeepEqual(gotAtt, tt.wantAtt) { t.Errorf("attestation: got = %v, want %v", gotAtt, tt.wantAtt) } - gotAggregate, err := extractDataType(types.AggregateAttestationMap, tt.args.digest, tt.args.chain) + gotAggregate, err := extractDataTypeFromTypeMap(types.AggregateAttestationMap, tt.args.digest, tt.args.chain) if (err != nil) != tt.wantErr { t.Errorf("aggregate: error = %v, wantErr %v", err, tt.wantErr) return diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index 167b1c7db856..6eac6fc8ff3d 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -105,7 +105,7 @@ func readFirstChunkedBlock(stream libp2pcore.Stream, tor blockchain.TemporalOrac if err != nil { return nil, err } - blk, err := extractDataType(types.BlockMap, rpcCtx, tor) + blk, err := extractDataTypeFromTypeMap(types.BlockMap, rpcCtx, tor) if err != nil { return nil, err } @@ -129,7 +129,7 @@ func readResponseChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, if err != nil { return nil, err } - blk, err := extractDataType(types.BlockMap, rpcCtx, tor) + blk, err := extractDataTypeFromTypeMap(types.BlockMap, rpcCtx, tor) if err != nil { return nil, err } diff --git a/beacon-chain/sync/rpc_metadata.go b/beacon-chain/sync/rpc_metadata.go index a9e8cef081b7..65fb0003d896 100644 --- a/beacon-chain/sync/rpc_metadata.go +++ b/beacon-chain/sync/rpc_metadata.go @@ -108,7 +108,7 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (metadata if err != nil { return nil, err } - msg, err := extractDataType(types.MetaDataMap, rpcCtx[:], s.cfg.clock) + msg, err := extractDataTypeFromTypeMap(types.MetaDataMap, rpcCtx[:], s.cfg.clock) if err != nil { return nil, err } From 7729c622c60531d22d978ee0744a8e3228201e7b Mon Sep 17 00:00:00 2001 From: rkapka Date: Mon, 24 Jun 2024 15:20:43 +0200 Subject: [PATCH 10/10] review --- beacon-chain/sync/decode_pubsub.go | 4 ++-- beacon-chain/sync/validate_beacon_attestation.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index 3c05b871cbb4..a0e6070149c2 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -57,7 +57,7 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err return nil, errors.Errorf("message of %T does not support marshaller interface", base) } // Handle different message types across forks. - dt, err := extractDataTypeFromTopic(topic, fDigest[:], s.cfg.clock) + dt, err := extractValidDataTypeFromTopic(topic, fDigest[:], s.cfg.clock) if err != nil { return nil, err } @@ -80,7 +80,7 @@ func (*Service) replaceForkDigest(topic string) (string, error) { return strings.Join(subStrings, "/"), nil } -func extractDataTypeFromTopic(topic string, digest []byte, clock *startup.Clock) (ssz.Unmarshaler, error) { +func extractValidDataTypeFromTopic(topic string, digest []byte, clock *startup.Clock) (ssz.Unmarshaler, error) { switch topic { case p2p.BlockSubnetTopicFormat: return extractDataTypeFromTypeMap(types.BlockMap, digest, clock) diff --git a/beacon-chain/sync/validate_beacon_attestation.go b/beacon-chain/sync/validate_beacon_attestation.go index 8df870d95c64..f72477031667 100644 --- a/beacon-chain/sync/validate_beacon_attestation.go +++ b/beacon-chain/sync/validate_beacon_attestation.go @@ -35,8 +35,6 @@ import ( // - attestation.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (attestation.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= attestation.data.slot). // - The signature of attestation is valid. func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { - var validationRes pubsub.ValidationResult - if pid == s.cfg.p2p.PeerID() { return pubsub.ValidationAccept, nil } @@ -93,6 +91,8 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p return pubsub.ValidationReject, err } + var validationRes pubsub.ValidationResult + var committeeIndex primitives.CommitteeIndex if att.Version() >= version.Electra { a, ok := att.(*eth.AttestationElectra) @@ -162,7 +162,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p a, ok := att.(*eth.AttestationElectra) // This will never fail in practice because we asserted the version if !ok { - return pubsub.ValidationReject, fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.AttestationElectra{}, att) + return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", ð.AttestationElectra{}, att) } s.savePendingAtt(ð.SignedAggregateAttestationAndProofElectra{Message: ð.AggregateAttestationAndProofElectra{Aggregate: a}}) } else {