diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index ddbbf7b5b105..627bc18cbd82 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "service.go", "subnets.go", "topics.go", + "topics_epbs.go", "utils.go", "watch_peers.go", ], @@ -68,6 +69,7 @@ go_library( "//monitoring/tracing:go_default_library", "//network:go_default_library", "//network/forks:go_default_library", + "//proto/engine/v1:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1/metadata:go_default_library", "//runtime:go_default_library", @@ -159,6 +161,7 @@ go_test( "//encoding/bytesutil:go_default_library", "//network:go_default_library", "//network/forks:go_default_library", + "//proto/engine/v1:go_default_library", "//proto/eth/v1:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//proto/testing:go_default_library", @@ -166,6 +169,7 @@ go_test( "//testing/assert:go_default_library", "//testing/require:go_default_library", "//testing/util:go_default_library", + "//testing/util/random:go_default_library", "//time:go_default_library", "//time/slots:go_default_library", "@com_github_ethereum_go_ethereum//crypto:go_default_library", diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index aa5253314440..60b6d2db8767 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -20,11 +20,13 @@ import ( fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" + enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" testpb "github.com/prysmaticlabs/prysm/v5/proto/testing" "github.com/prysmaticlabs/prysm/v5/testing/assert" "github.com/prysmaticlabs/prysm/v5/testing/require" "github.com/prysmaticlabs/prysm/v5/testing/util" + "github.com/prysmaticlabs/prysm/v5/testing/util/random" "google.golang.org/protobuf/proto" ) @@ -520,3 +522,64 @@ func TestService_BroadcastBlob(t *testing.T) { require.NoError(t, p.BroadcastBlob(ctx, subnet, blobSidecar)) require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Second), "Failed to receive pubsub within 1s") } + +func TestService_BroadcastExecutionPayloadHeader(t *testing.T) { + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + + if len(p1.BHost.Network().Peers()) == 0 { + t.Fatal("No peers") + } + + p := &Service{ + host: p1.BHost, + pubsub: p1.PubSub(), + joinedTopics: map[string]*pubsub.Topic{}, + cfg: &Config{}, + genesisTime: time.Now(), + genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), + } + + msg := random.SignedExecutionPayloadHeader(t) + + // External peer subscribes to the topic. + topic := SignedExecutionPayloadHeaderTopicFormat + GossipTypeMapping[reflect.TypeOf(msg)] = topic + + digest, err := p.currentForkDigest() + require.NoError(t, err) + + topic = fmt.Sprintf("%s%s", fmt.Sprintf(topic, digest), p.Encoding().ProtocolSuffix()) + sub, err := p2.SubscribeToTopic(topic) + require.NoError(t, err) + + time.Sleep(50 * time.Millisecond) // Necessary delay for libp2p. + + // Async listen for the pubsub, must be before the broadcast. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + incomingMessage, err := sub.Next(ctx) + require.NoError(t, err) + + // Same message received from other peer. + result := &enginev1.SignedExecutionPayloadHeader{} + require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result)) + require.DeepEqual(t, result, msg) + }() + + // Unknown message to broadcast. + ctx := context.Background() + require.ErrorContains(t, "message type is not mapped to a PubSub topic", p.Broadcast(ctx, nil)) + + // Broadcast to second peer and wait. + require.NoError(t, p.Broadcast(context.Background(), msg)) + if util.WaitTimeout(&wg, 1*time.Second) { + t.Error("Failed to receive pubsub within 1s") + } +} diff --git a/beacon-chain/p2p/gossip_topic_mappings.go b/beacon-chain/p2p/gossip_topic_mappings.go index d88a4499ce2b..ebafa3e0e0ee 100644 --- a/beacon-chain/p2p/gossip_topic_mappings.go +++ b/beacon-chain/p2p/gossip_topic_mappings.go @@ -5,6 +5,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "google.golang.org/protobuf/proto" ) @@ -22,6 +23,7 @@ var gossipTopicMappings = map[string]func() proto.Message{ SyncCommitteeSubnetTopicFormat: func() proto.Message { return ðpb.SyncCommitteeMessage{} }, BlsToExecutionChangeSubnetTopicFormat: func() proto.Message { return ðpb.SignedBLSToExecutionChange{} }, BlobSubnetTopicFormat: func() proto.Message { return ðpb.BlobSidecar{} }, + SignedExecutionPayloadHeaderTopicFormat: func() proto.Message { return &enginev1.SignedExecutionPayloadHeader{} }, } // GossipTopicMappings is a function to return the assigned data type @@ -104,4 +106,6 @@ func init() { GossipTypeMapping[reflect.TypeOf(ðpb.AttestationElectra{})] = AttestationSubnetTopicFormat GossipTypeMapping[reflect.TypeOf(ðpb.AttesterSlashingElectra{})] = AttesterSlashingSubnetTopicFormat GossipTypeMapping[reflect.TypeOf(ðpb.SignedAggregateAttestationAndProofElectra{})] = AggregateAndProofSubnetTopicFormat + // Handle ePBS objects. + GossipTypeMapping[reflect.TypeOf(&enginev1.SignedExecutionPayloadHeader{})] = SignedExecutionPayloadHeaderTopicFormat } diff --git a/beacon-chain/p2p/gossip_topic_mappings_test.go b/beacon-chain/p2p/gossip_topic_mappings_test.go index 2c134f425fa6..b6f118c277e7 100644 --- a/beacon-chain/p2p/gossip_topic_mappings_test.go +++ b/beacon-chain/p2p/gossip_topic_mappings_test.go @@ -7,8 +7,10 @@ import ( "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" + enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/testing/assert" + "github.com/prysmaticlabs/prysm/v5/testing/require" ) func TestMappingHasNoDuplicates(t *testing.T) { @@ -30,17 +32,20 @@ func TestGossipTopicMappings_CorrectType(t *testing.T) { capellaForkEpoch := primitives.Epoch(300) denebForkEpoch := primitives.Epoch(400) electraForkEpoch := primitives.Epoch(500) + epbsForkEpoch := primitives.Epoch(600) bCfg.AltairForkEpoch = altairForkEpoch bCfg.BellatrixForkEpoch = bellatrixForkEpoch bCfg.CapellaForkEpoch = capellaForkEpoch bCfg.DenebForkEpoch = denebForkEpoch bCfg.ElectraForkEpoch = electraForkEpoch + bCfg.EPBSForkEpoch = epbsForkEpoch bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.AltairForkVersion)] = primitives.Epoch(100) bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.BellatrixForkVersion)] = primitives.Epoch(200) bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.CapellaForkVersion)] = primitives.Epoch(300) bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.DenebForkVersion)] = primitives.Epoch(400) bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.ElectraForkVersion)] = primitives.Epoch(500) + bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.EPBSForkVersion)] = primitives.Epoch(600) params.OverrideBeaconConfig(bCfg) // Phase 0 @@ -126,4 +131,9 @@ func TestGossipTopicMappings_CorrectType(t *testing.T) { pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, electraForkEpoch) _, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProofElectra) assert.Equal(t, true, ok) + + // Epbs fork + pMessage = GossipTopicMappings(SignedExecutionPayloadHeaderTopicFormat, epbsForkEpoch) + _, ok = pMessage.(*enginev1.SignedExecutionPayloadHeader) + require.Equal(t, true, ok) } diff --git a/beacon-chain/p2p/topics_epbs.go b/beacon-chain/p2p/topics_epbs.go new file mode 100644 index 000000000000..04c9c06a06d9 --- /dev/null +++ b/beacon-chain/p2p/topics_epbs.go @@ -0,0 +1,6 @@ +package p2p + +const ( + GossipSignedExecutionPayloadHeader = "signed_execution_payload_header" + SignedExecutionPayloadHeaderTopicFormat = GossipProtocolAndDigest + GossipSignedExecutionPayloadHeader +)