Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Goodbye Message When Disconnecting With Peers #5589

Merged
merged 10 commits into from
Apr 23, 2020
21 changes: 12 additions & 9 deletions beacon-chain/p2p/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -15,7 +14,8 @@ import (
// AddConnectionHandler adds a callback function which handles the connection with a
// newly added peer. It performs a handshake with that peer by sending a hello request
// and validating the response from the peer.
func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer.ID) error) {
func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer.ID) error,
goodbyeFunc func(ctx context.Context, id peer.ID) error) {
s.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(net network.Network, conn network.Conn) {
log := log.WithField("peer", conn.RemotePeer().Pretty())
Expand All @@ -28,10 +28,15 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
}
s.peers.Add(nil /* ENR */, conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction)
if len(s.peers.Active()) >= int(s.cfg.MaxPeers) {
log.WithField("reason", "at peer limit").Trace("Ignoring connection request")
if err := s.Disconnect(conn.RemotePeer()); err != nil {
log.WithError(err).Error("Unable to disconnect from peer")
}
go func() {
log.WithField("reason", "at peer limit").Trace("Ignoring connection request")
if err := goodbyeFunc(context.Background(), conn.RemotePeer()); err != nil {
log.WithError(err).Error("Unable to send goodbye message to peer")
}
if err := s.Disconnect(conn.RemotePeer()); err != nil {
log.WithError(err).Error("Unable to disconnect from peer")
}
}()
return
}
if s.peers.IsBad(conn.RemotePeer()) {
Expand All @@ -52,9 +57,7 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
"activePeers": len(s.peers.Active()),
})
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := reqFunc(ctx, conn.RemotePeer()); err != nil && err != io.EOF {
if err := reqFunc(context.Background(), conn.RemotePeer()); err != nil && err != io.EOF {
log.WithError(err).Trace("Handshake failed")
if err.Error() == "protocol not supported" {
// This is only to ensure the smooth running of our testnets. This will not be
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type SetStreamHandler interface {

// ConnectionHandler configures p2p to handle connections with a peer.
type ConnectionHandler interface {
AddConnectionHandler(f func(ctx context.Context, id peer.ID) error)
AddConnectionHandler(f func(ctx context.Context, id peer.ID) error, g func(context.Context, peer.ID) error)
AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error)
}

Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ func (p *TestP2P) PeerID() peer.ID {
}

// AddConnectionHandler handles the connection with a newly connected peer.
func (p *TestP2P) AddConnectionHandler(f func(ctx context.Context, id peer.ID) error) {
func (p *TestP2P) AddConnectionHandler(f func(ctx context.Context, id peer.ID) error,
g func(context.Context, peer.ID) error) {
p.Host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(net network.Network, conn network.Conn) {
// Must be handled in a goroutine as this callback cannot be blocking.
Expand Down
20 changes: 20 additions & 0 deletions beacon-chain/sync/rpc_goodbye.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
)

const (
Expand Down Expand Up @@ -41,6 +43,24 @@ func (r *Service) goodbyeRPCHandler(ctx context.Context, msg interface{}, stream
return r.p2p.Disconnect(stream.Conn().RemotePeer())
}

func (r *Service) sendGoodByeMessage(ctx context.Context, code uint64, id peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

stream, err := r.p2p.Send(ctx, &code, p2p.RPCGoodByeTopic, id)
if err != nil {
return err
}
log := log.WithField("Reason", goodbyeMessage(code))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Sending Goodbye message to peer")
return nil
}

// sends a goodbye message for a generic error
func (r *Service) sendGenericGoodbyeMessage(ctx context.Context, id peer.ID) error {
return r.sendGoodByeMessage(ctx, codeGenericError, id)
}

func goodbyeMessage(num uint64) string {
reason, ok := goodByes[num]
if ok {
Expand Down
49 changes: 49 additions & 0 deletions beacon-chain/sync/rpc_goodbye_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,52 @@ func TestGoodByeRPCHandler_Disconnects_With_Peer(t *testing.T) {
t.Error("Peer is still not disconnected despite sending a goodbye message")
}
}

func TestSendGoodbye_SendsMessage(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
if len(p1.Host.Network().Peers()) != 1 {
t.Error("Expected peers to be connected")
}

// Set up a head state in the database with data we expect.
d := db.SetupDB(t)
defer db.TeardownDB(t, d)

r := &Service{
db: d,
p2p: p1,
}
failureCode := codeClientShutdown

// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz")
var wg sync.WaitGroup
wg.Add(1)
p2.Host.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
out := new(uint64)
if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil {
t.Fatal(err)
}
if *out != failureCode {
t.Fatalf("Wanted goodbye code of %d but got %d", failureCode, *out)
}

})

err := r.sendGoodByeMessage(context.Background(), failureCode, p2.Host.ID())
if err != nil {
t.Errorf("Unxpected error: %v", err)
}

if testutil.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}

conns := p1.Host.Network().ConnsToPeer(p1.Host.ID())
if len(conns) > 0 {
t.Error("Peer is still not disconnected despite sending a goodbye message")
}
}
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (r *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2
log.WithError(err).Error("Failed to close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
setRPCStreamDeadlines(stream)

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
}
})

p1.AddConnectionHandler(r.sendRPCStatusRequest)
p1.AddConnectionHandler(r.sendRPCStatusRequest, r.sendGenericGoodbyeMessage)
p1.Connect(p2)

if testutil.WaitTimeout(&wg, 1*time.Second) {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (r *Service) Start() {
panic(err)
}

r.p2p.AddConnectionHandler(r.reValidatePeer)
r.p2p.AddConnectionHandler(r.reValidatePeer, r.sendGenericGoodbyeMessage)
r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus)
r.p2p.AddPingMethod(r.sendPingRequest)
r.processPendingBlocksQueue()
Expand Down