Skip to content

Commit

Permalink
Broadcast signed execution payload header to peer (#14300)
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain authored and potuz committed Aug 29, 2024
1 parent 74b95bf commit 3c245f2
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 0 deletions.
4 changes: 4 additions & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"service.go",
"subnets.go",
"topics.go",
"topics_epbs.go",
"utils.go",
"watch_peers.go",
],
Expand Down Expand Up @@ -69,6 +70,7 @@ go_library(
"//monitoring/tracing/trace:go_default_library",
"//network:go_default_library",
"//network/forks:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/metadata:go_default_library",
"//runtime:go_default_library",
Expand Down Expand Up @@ -160,13 +162,15 @@ go_test(
"//encoding/bytesutil:go_default_library",
"//network:go_default_library",
"//network/forks:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/testing:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//testing/util/random:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
Expand Down
63 changes: 63 additions & 0 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
testpb "github.com/prysmaticlabs/prysm/v5/proto/testing"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
"github.com/prysmaticlabs/prysm/v5/testing/util/random"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -520,3 +522,64 @@ func TestService_BroadcastBlob(t *testing.T) {
require.NoError(t, p.BroadcastBlob(ctx, subnet, blobSidecar))
require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Second), "Failed to receive pubsub within 1s")
}

func TestService_BroadcastExecutionPayloadHeader(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)

if len(p1.BHost.Network().Peers()) == 0 {
t.Fatal("No peers")
}

p := &Service{
host: p1.BHost,
pubsub: p1.PubSub(),
joinedTopics: map[string]*pubsub.Topic{},
cfg: &Config{},
genesisTime: time.Now(),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
}

msg := random.SignedExecutionPayloadHeader(t)

// External peer subscribes to the topic.
topic := SignedExecutionPayloadHeaderTopicFormat
GossipTypeMapping[reflect.TypeOf(msg)] = topic

digest, err := p.currentForkDigest()
require.NoError(t, err)

topic = fmt.Sprintf("%s%s", fmt.Sprintf(topic, digest), p.Encoding().ProtocolSuffix())
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)

time.Sleep(50 * time.Millisecond) // Necessary delay for libp2p.

// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

incomingMessage, err := sub.Next(ctx)
require.NoError(t, err)

// Same message received from other peer.
result := &enginev1.SignedExecutionPayloadHeader{}
require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result))
require.DeepEqual(t, result, msg)
}()

// Unknown message to broadcast.
ctx := context.Background()
require.ErrorContains(t, "message type is not mapped to a PubSub topic", p.Broadcast(ctx, nil))

// Broadcast to second peer and wait.
require.NoError(t, p.Broadcast(context.Background(), msg))
if util.WaitTimeout(&wg, 1*time.Second) {
t.Error("Failed to receive pubsub within 1s")
}
}
4 changes: 4 additions & 0 deletions beacon-chain/p2p/gossip_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"google.golang.org/protobuf/proto"
)
Expand All @@ -22,6 +23,7 @@ var gossipTopicMappings = map[string]func() proto.Message{
SyncCommitteeSubnetTopicFormat: func() proto.Message { return &ethpb.SyncCommitteeMessage{} },
BlsToExecutionChangeSubnetTopicFormat: func() proto.Message { return &ethpb.SignedBLSToExecutionChange{} },
BlobSubnetTopicFormat: func() proto.Message { return &ethpb.BlobSidecar{} },
SignedExecutionPayloadHeaderTopicFormat: func() proto.Message { return &enginev1.SignedExecutionPayloadHeader{} },
}

// GossipTopicMappings is a function to return the assigned data type
Expand Down Expand Up @@ -104,4 +106,6 @@ func init() {
GossipTypeMapping[reflect.TypeOf(&ethpb.AttestationElectra{})] = AttestationSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.AttesterSlashingElectra{})] = AttesterSlashingSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedAggregateAttestationAndProofElectra{})] = AggregateAndProofSubnetTopicFormat
// Handle ePBS objects.
GossipTypeMapping[reflect.TypeOf(&enginev1.SignedExecutionPayloadHeader{})] = SignedExecutionPayloadHeaderTopicFormat
}
10 changes: 10 additions & 0 deletions beacon-chain/p2p/gossip_topic_mappings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
)

func TestMappingHasNoDuplicates(t *testing.T) {
Expand All @@ -30,17 +32,20 @@ func TestGossipTopicMappings_CorrectType(t *testing.T) {
capellaForkEpoch := primitives.Epoch(300)
denebForkEpoch := primitives.Epoch(400)
electraForkEpoch := primitives.Epoch(500)
epbsForkEpoch := primitives.Epoch(600)

bCfg.AltairForkEpoch = altairForkEpoch
bCfg.BellatrixForkEpoch = bellatrixForkEpoch
bCfg.CapellaForkEpoch = capellaForkEpoch
bCfg.DenebForkEpoch = denebForkEpoch
bCfg.ElectraForkEpoch = electraForkEpoch
bCfg.EPBSForkEpoch = epbsForkEpoch
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.AltairForkVersion)] = primitives.Epoch(100)
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.BellatrixForkVersion)] = primitives.Epoch(200)
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.CapellaForkVersion)] = primitives.Epoch(300)
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.DenebForkVersion)] = primitives.Epoch(400)
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.ElectraForkVersion)] = primitives.Epoch(500)
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.EPBSForkVersion)] = primitives.Epoch(600)
params.OverrideBeaconConfig(bCfg)

// Phase 0
Expand Down Expand Up @@ -126,4 +131,9 @@ func TestGossipTopicMappings_CorrectType(t *testing.T) {
pMessage = GossipTopicMappings(AggregateAndProofSubnetTopicFormat, electraForkEpoch)
_, ok = pMessage.(*ethpb.SignedAggregateAttestationAndProofElectra)
assert.Equal(t, true, ok)

// Epbs fork
pMessage = GossipTopicMappings(SignedExecutionPayloadHeaderTopicFormat, epbsForkEpoch)
_, ok = pMessage.(*enginev1.SignedExecutionPayloadHeader)
require.Equal(t, true, ok)
}
6 changes: 6 additions & 0 deletions beacon-chain/p2p/topics_epbs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package p2p

const (
GossipSignedExecutionPayloadHeader = "signed_execution_payload_header"
SignedExecutionPayloadHeaderTopicFormat = GossipProtocolAndDigest + GossipSignedExecutionPayloadHeader
)

0 comments on commit 3c245f2

Please sign in to comment.