diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index 60b6d2db8767..be0a66c86699 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" + ssz "github.com/prysmaticlabs/fastssz" "github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" @@ -20,7 +21,6 @@ import ( fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" - enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" testpb "github.com/prysmaticlabs/prysm/v5/proto/testing" "github.com/prysmaticlabs/prysm/v5/testing/assert" @@ -28,6 +28,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/testing/util" "github.com/prysmaticlabs/prysm/v5/testing/util/random" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" ) func TestService_Broadcast(t *testing.T) { @@ -524,6 +525,22 @@ func TestService_BroadcastBlob(t *testing.T) { } func TestService_BroadcastExecutionPayloadHeader(t *testing.T) { + msg := random.SignedExecutionPayloadHeader(t) + testBroadcast(t, SignedExecutionPayloadHeaderTopicFormat, msg) +} + +func TestService_BroadcastExecutionPayloadEnvelope(t *testing.T) { + msg := random.SignedExecutionPayloadEnvelope(t) + testBroadcast(t, SignedExecutionPayloadEnvelopeTopicFormat, msg) +} + +func TestService_BroadcastPayloadAttestationMessage(t *testing.T) { + msg := random.PayloadAttestationMessage(t) + testBroadcast(t, PayloadAttestationMessageTopicFormat, msg) +} + +func testBroadcast(t *testing.T, topicFormat string, msg interface{}) { + // Create two peers and let them connect. p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t) p1.Connect(p2) @@ -532,31 +549,29 @@ func TestService_BroadcastExecutionPayloadHeader(t *testing.T) { t.Fatal("No peers") } - p := &Service{ + // Create a `Service` for the first peer. + s1 := &Service{ host: p1.BHost, pubsub: p1.PubSub(), joinedTopics: map[string]*pubsub.Topic{}, cfg: &Config{}, genesisTime: time.Now(), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), + peers: peers.NewStatus(context.Background(), &peers.StatusConfig{ + ScorerParams: &scorers.Config{}, + }), } - msg := random.SignedExecutionPayloadHeader(t) - - // External peer subscribes to the topic. - topic := SignedExecutionPayloadHeaderTopicFormat - GossipTypeMapping[reflect.TypeOf(msg)] = topic - - digest, err := p.currentForkDigest() + // The second peer subscribes to the topic. + digest, err := s1.currentForkDigest() require.NoError(t, err) - - topic = fmt.Sprintf("%s%s", fmt.Sprintf(topic, digest), p.Encoding().ProtocolSuffix()) - sub, err := p2.SubscribeToTopic(topic) + topic := fmt.Sprintf(topicFormat, digest) + s1.Encoding().ProtocolSuffix() + subscription, err := p2.SubscribeToTopic(topic) require.NoError(t, err) - time.Sleep(50 * time.Millisecond) // Necessary delay for libp2p. + time.Sleep(50 * time.Millisecond) // Wait for libp2p to be set up. - // Async listen for the pubsub, must be before the broadcast. + // Start a goroutine listening for a pubsub message. var wg sync.WaitGroup wg.Add(1) go func() { @@ -564,21 +579,22 @@ func TestService_BroadcastExecutionPayloadHeader(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - incomingMessage, err := sub.Next(ctx) + incomingMessage, err := subscription.Next(ctx) require.NoError(t, err) - // Same message received from other peer. - result := &enginev1.SignedExecutionPayloadHeader{} - require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result)) + result := msg.(ssz.Unmarshaler) + require.NoError(t, s1.Encoding().DecodeGossip(incomingMessage.Data, result)) require.DeepEqual(t, result, msg) }() - // Unknown message to broadcast. + // An attempt to broadcast a message unmapped to a topic should fail. ctx := context.Background() - require.ErrorContains(t, "message type is not mapped to a PubSub topic", p.Broadcast(ctx, nil)) + require.ErrorContains(t, "message type is not mapped to a PubSub topic", s1.Broadcast(ctx, nil)) - // Broadcast to second peer and wait. - require.NoError(t, p.Broadcast(context.Background(), msg)) + // The first peer broadcasts the message to the second peer. + require.NoError(t, s1.Broadcast(ctx, msg.(protoreflect.ProtoMessage))) + + // Wait for one second for the message to be delivered and processed. if util.WaitTimeout(&wg, 1*time.Second) { t.Error("Failed to receive pubsub within 1s") } diff --git a/beacon-chain/p2p/gossip_topic_mappings.go b/beacon-chain/p2p/gossip_topic_mappings.go index ebafa3e0e0ee..61cc42b7b1b3 100644 --- a/beacon-chain/p2p/gossip_topic_mappings.go +++ b/beacon-chain/p2p/gossip_topic_mappings.go @@ -24,6 +24,8 @@ var gossipTopicMappings = map[string]func() proto.Message{ BlsToExecutionChangeSubnetTopicFormat: func() proto.Message { return ðpb.SignedBLSToExecutionChange{} }, BlobSubnetTopicFormat: func() proto.Message { return ðpb.BlobSidecar{} }, SignedExecutionPayloadHeaderTopicFormat: func() proto.Message { return &enginev1.SignedExecutionPayloadHeader{} }, + SignedExecutionPayloadEnvelopeTopicFormat: func() proto.Message { return &enginev1.SignedExecutionPayloadEnvelope{} }, + PayloadAttestationMessageTopicFormat: func() proto.Message { return ðpb.PayloadAttestationMessage{} }, } // GossipTopicMappings is a function to return the assigned data type @@ -108,4 +110,6 @@ func init() { GossipTypeMapping[reflect.TypeOf(ðpb.SignedAggregateAttestationAndProofElectra{})] = AggregateAndProofSubnetTopicFormat // Handle ePBS objects. GossipTypeMapping[reflect.TypeOf(&enginev1.SignedExecutionPayloadHeader{})] = SignedExecutionPayloadHeaderTopicFormat + GossipTypeMapping[reflect.TypeOf(&enginev1.SignedExecutionPayloadEnvelope{})] = SignedExecutionPayloadEnvelopeTopicFormat + GossipTypeMapping[reflect.TypeOf(ðpb.PayloadAttestationMessage{})] = PayloadAttestationMessageTopicFormat } diff --git a/beacon-chain/p2p/gossip_topic_mappings_test.go b/beacon-chain/p2p/gossip_topic_mappings_test.go index b6f118c277e7..d434b2b56c48 100644 --- a/beacon-chain/p2p/gossip_topic_mappings_test.go +++ b/beacon-chain/p2p/gossip_topic_mappings_test.go @@ -136,4 +136,10 @@ func TestGossipTopicMappings_CorrectType(t *testing.T) { pMessage = GossipTopicMappings(SignedExecutionPayloadHeaderTopicFormat, epbsForkEpoch) _, ok = pMessage.(*enginev1.SignedExecutionPayloadHeader) require.Equal(t, true, ok) + pMessage = GossipTopicMappings(SignedExecutionPayloadEnvelopeTopicFormat, epbsForkEpoch) + _, ok = pMessage.(*enginev1.SignedExecutionPayloadEnvelope) + require.Equal(t, true, ok) + pMessage = GossipTopicMappings(PayloadAttestationMessageTopicFormat, epbsForkEpoch) + _, ok = pMessage.(*ethpb.PayloadAttestationMessage) + require.Equal(t, true, ok) } diff --git a/beacon-chain/p2p/topics_epbs.go b/beacon-chain/p2p/topics_epbs.go index 04c9c06a06d9..75cb9be7fea1 100644 --- a/beacon-chain/p2p/topics_epbs.go +++ b/beacon-chain/p2p/topics_epbs.go @@ -1,6 +1,11 @@ package p2p const ( - GossipSignedExecutionPayloadHeader = "signed_execution_payload_header" - SignedExecutionPayloadHeaderTopicFormat = GossipProtocolAndDigest + GossipSignedExecutionPayloadHeader + GossipSignedExecutionPayloadHeader = "signed_execution_payload_header" + GossipSignedExecutionPayloadEnvelope = "signed_execution_payload_envelope" + GossipPayloadAttestationMessage = "payload_attestation_message" + + SignedExecutionPayloadHeaderTopicFormat = GossipProtocolAndDigest + GossipSignedExecutionPayloadHeader + SignedExecutionPayloadEnvelopeTopicFormat = GossipProtocolAndDigest + GossipSignedExecutionPayloadEnvelope + PayloadAttestationMessageTopicFormat = GossipProtocolAndDigest + GossipPayloadAttestationMessage ) diff --git a/testing/util/random/epbs.go b/testing/util/random/epbs.go index 5a72fcc5dbc8..51925c36e6ea 100644 --- a/testing/util/random/epbs.go +++ b/testing/util/random/epbs.go @@ -349,6 +349,15 @@ func PayloadAttestationData(t *testing.T) *ethpb.PayloadAttestationData { } } +// PayloadAttestationMessage creates a random PayloadAttestationMessage for testing purposes. +func PayloadAttestationMessage(t *testing.T) *ethpb.PayloadAttestationMessage { + return ðpb.PayloadAttestationMessage{ + ValidatorIndex: primitives.ValidatorIndex(randomUint64(t)), + Data: PayloadAttestationData(t), + Signature: randomBytes(96, t), + } +} + // SignedExecutionPayloadEnvelope creates a random SignedExecutionPayloadEnvelope for testing purposes. func SignedExecutionPayloadEnvelope(t *testing.T) *enginev1.SignedExecutionPayloadEnvelope { return &enginev1.SignedExecutionPayloadEnvelope{ @@ -432,7 +441,7 @@ func WithdrawalRequest(t *testing.T) *enginev1.WithdrawalRequest { func ConsolidationRequest(t *testing.T) *enginev1.ConsolidationRequest { return &enginev1.ConsolidationRequest{ SourceAddress: randomBytes(20, t), - SourcePubkey: randomBytes(20, t), + SourcePubkey: randomBytes(48, t), TargetPubkey: randomBytes(48, t), } }