Skip to content

Commit

Permalink
Fix Network Encoder (#5435)
Browse files Browse the repository at this point in the history
* add new block encoding for snappy
* Merge branch 'master' into fixEncoder
* flip flag
* Merge branch 'fixEncoder' of https://github.com/prysmaticlabs/geth-sharding into fixEncoder
* not an underscore
  • Loading branch information
nisdas authored Apr 15, 2020
1 parent 24fc67a commit 9213169
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 7 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
span.AddAttributes(trace.StringAttribute("topic", topic))

buf := new(bytes.Buffer)
if _, err := s.Encoding().Encode(buf, msg); err != nil {
if _, err := s.Encoding().EncodeGossip(buf, msg); err != nil {
err := errors.Wrap(err, "could not encode message")
traceutil.AnnotateError(span, err)
return err
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (

"github.com/gogo/protobuf/proto"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
testpb "github.com/prysmaticlabs/prysm/proto/testing"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
testpb "github.com/prysmaticlabs/prysm/proto/testing"
"github.com/prysmaticlabs/prysm/shared/testutil"
)

Expand Down Expand Up @@ -67,7 +67,7 @@ func TestService_Broadcast(t *testing.T) {
}

result := &testpb.TestSimpleMessage{}
if err := p.Encoding().Decode(incomingMessage.Data, result); err != nil {
if err := p.Encoding().DecodeGossip(incomingMessage.Data, result); err != nil {
tt.Fatal(err)
}
if !proto.Equal(result, msg) {
Expand Down
6 changes: 5 additions & 1 deletion beacon-chain/p2p/encoder/network_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ const (

// NetworkEncoding represents an encoder compatible with Ethereum 2.0 p2p.
type NetworkEncoding interface {
// Decodes to the provided message. The interface must be a pointer to the decoding destination.
// Decode to the provided message. The interface must be a pointer to the decoding destination.
Decode([]byte, interface{}) error
// DecodeGossip to the provided gossip message. The interface must be a pointer to the decoding destination.
DecodeGossip([]byte, interface{}) error
// DecodeWithLength a bytes from a reader with a varint length prefix. The interface must be a pointer to the
// decoding destination.
DecodeWithLength(io.Reader, interface{}) error
Expand All @@ -22,6 +24,8 @@ type NetworkEncoding interface {
DecodeWithMaxLength(io.Reader, interface{}, uint64) error
// Encode an arbitrary message to the provided writer. The interface must be a pointer object to encode.
Encode(io.Writer, interface{}) (int, error)
// EncodeGossip an arbitrary gossip message to the provided writer. The interface must be a pointer object to encode.
EncodeGossip(io.Writer, interface{}) (int, error)
// EncodeWithLength an arbitrary message to the provided writer with a varint length prefix. The interface must be
// a pointer object to encode.
EncodeWithLength(io.Writer, interface{}) (int, error)
Expand Down
27 changes: 27 additions & 0 deletions beacon-chain/p2p/encoder/ssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ func (e SszNetworkEncoder) Encode(w io.Writer, msg interface{}) (int, error) {
return w.Write(b)
}

// EncodeGossip the proto gossip message to the io.Writer.
func (e SszNetworkEncoder) EncodeGossip(w io.Writer, msg interface{}) (int, error) {
if msg == nil {
return 0, nil
}
b, err := e.doEncode(msg)
if err != nil {
return 0, err
}
if e.UseSnappyCompression {
b = snappy.Encode(nil /*dst*/, b)
}
return w.Write(b)
}

// EncodeWithLength the proto message to the io.Writer. This encoding prefixes the byte slice with a protobuf varint
// to indicate the size of the message.
func (e SszNetworkEncoder) EncodeWithLength(w io.Writer, msg interface{}) (int, error) {
Expand Down Expand Up @@ -105,6 +120,18 @@ func (e SszNetworkEncoder) Decode(b []byte, to interface{}) error {
return e.doDecode(b, to)
}

// DecodeGossip decodes the bytes to the protobuf gossip message provided.
func (e SszNetworkEncoder) DecodeGossip(b []byte, to interface{}) error {
if e.UseSnappyCompression {
var err error
b, err = snappy.Decode(nil /*dst*/, b)
if err != nil {
return err
}
}
return e.doDecode(b, to)
}

// DecodeWithLength the bytes from io.Reader to the protobuf message provided.
func (e SszNetworkEncoder) DecodeWithLength(r io.Reader, to interface{}) error {
return e.DecodeWithMaxLength(r, to, MaxChunkSize)
Expand Down
22 changes: 22 additions & 0 deletions beacon-chain/p2p/encoder/ssz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ func TestSszNetworkEncoder_RoundTrip(t *testing.T) {
e := &encoder.SszNetworkEncoder{UseSnappyCompression: false}
testRoundTrip(t, e)
testRoundTripWithLength(t, e)
testRoundTripWithGossip(t, e)
}

func TestSszNetworkEncoder_RoundTrip_Snappy(t *testing.T) {
e := &encoder.SszNetworkEncoder{UseSnappyCompression: true}
testRoundTrip(t, e)
testRoundTripWithLength(t, e)
testRoundTripWithGossip(t, e)
}

func testRoundTrip(t *testing.T, e *encoder.SszNetworkEncoder) {
Expand Down Expand Up @@ -63,6 +65,26 @@ func testRoundTripWithLength(t *testing.T, e *encoder.SszNetworkEncoder) {
}
}

func testRoundTripWithGossip(t *testing.T, e *encoder.SszNetworkEncoder) {
buf := new(bytes.Buffer)
msg := &testpb.TestSimpleMessage{
Foo: []byte("fooooo"),
Bar: 9001,
}
_, err := e.EncodeGossip(buf, msg)
if err != nil {
t.Fatal(err)
}
decoded := &testpb.TestSimpleMessage{}
if err := e.DecodeGossip(buf.Bytes(), decoded); err != nil {
t.Fatal(err)
}
if !proto.Equal(decoded, msg) {
t.Logf("decoded=%+v\n", decoded)
t.Error("Decoded message is not the same as original")
}
}

func TestSszNetworkEncoder_EncodeWithMaxLength(t *testing.T) {
buf := new(bytes.Buffer)
msg := &testpb.TestSimpleMessage{
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) {
time.Sleep(time.Millisecond * 100)

buf := new(bytes.Buffer)
if _, err := p.Encoding().Encode(buf, msg); err != nil {
if _, err := p.Encoding().EncodeGossip(buf, msg); err != nil {
p.t.Fatalf("Failed to encode message: %v", err)
}
digest, err := p.ForkDigest()
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/decode_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (r *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error
return nil, fmt.Errorf("no message mapped for topic %s", topic)
}
m := proto.Clone(base)
if err := r.p2p.Encoding().Decode(msg.Data, m); err != nil {
if err := r.p2p.Encoding().DecodeGossip(msg.Data, m); err != nil {
return nil, err
}
return m, nil
Expand Down
2 changes: 1 addition & 1 deletion shared/cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ var (
P2PEncoding = &cli.StringFlag{
Name: "p2p-encoding",
Usage: "The encoding format of messages sent over the wire. The default is 0, which represents ssz",
Value: "ssz",
Value: "ssz-snappy",
}
// ForceClearDB removes any previously stored data at the data directory.
ForceClearDB = &cli.BoolFlag{
Expand Down

0 comments on commit 9213169

Please sign in to comment.